atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add incoming offer discovery via push notification

When Node A offers to replicate Node B's data, Node A now resolves
Node B's org.p2pds.peer record to find their p2pds endpoint URL and
POSTs a notification. Node B verifies the offer exists in Node A's
repo (anti-spoofing), stores it in an incoming_offers table, and
shows it in the dashboard with Accept/Reject buttons. Accepting
creates a reciprocal offer which triggers mutual agreement detection.

- Add PUBLIC_URL config + endpoint field to peer record
- Add endpoint to PeerInfo type in peer-discovery
- Add incoming_offers table + CRUD methods in sync-storage
- Add push notification in offerDid(), acceptOffer/rejectOffer in
ReplicationManager
- Add notifyOffer (unauthenticated), acceptOffer, rejectOffer XRPC
endpoints with rate limiting
- Add incoming offers UI section in dashboard

+725 -210
+1 -1
src/bidirectional-replication.test.ts
··· 49 49 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 50 50 RATE_LIMIT_MAX_CONNECTIONS: 100, 51 51 RATE_LIMIT_FIREHOSE_PER_IP: 3, 52 - OAUTH_ENABLED: false, 52 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 53 53 }; 54 54 } 55 55
+1 -1
src/capstone-e2e.test.ts
··· 47 47 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 48 48 RATE_LIMIT_MAX_CONNECTIONS: 100, 49 49 RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 - OAUTH_ENABLED: false, 50 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 51 51 }; 52 52 } 53 53
+26 -18
src/config.ts
··· 1 1 import { readFileSync, existsSync } from "node:fs"; 2 + import { randomBytes } from "node:crypto"; 2 3 import { resolve } from "node:path"; 3 4 import type { PolicySet } from "./policy/types.js"; 4 5 ··· 7 8 DID?: string; 8 9 /** Social account handle (optional). */ 9 10 HANDLE?: string; 10 - PDS_HOSTNAME: string; 11 + PDS_HOSTNAME?: string; 11 12 AUTH_TOKEN: string; 12 13 /** Social account signing key hex (optional). */ 13 14 SIGNING_KEY?: string; 14 15 /** Social account signing key public multibase (optional). */ 15 16 SIGNING_KEY_PUBLIC?: string; 16 - JWT_SECRET: string; 17 - PASSWORD_HASH: string; 17 + JWT_SECRET?: string; 18 + PASSWORD_HASH?: string; 18 19 EMAIL?: string; 19 20 DATA_DIR: string; 20 21 PORT: number; ··· 36 37 RATE_LIMIT_FIREHOSE_PER_IP: number; 37 38 /** Whether OAuth login is enabled for remote PDS publishing (default false). */ 38 39 OAUTH_ENABLED: boolean; 40 + /** Public URL of this p2pds instance, used for push notifications between nodes. */ 41 + PUBLIC_URL: string; 39 42 } 40 43 41 - const REQUIRED_KEYS = [ 44 + /** Required when OAuth is disabled (legacy mode). With OAuth, identity comes from login. */ 45 + const LEGACY_REQUIRED_KEYS = [ 42 46 "PDS_HOSTNAME", 43 47 "AUTH_TOKEN", 44 48 "JWT_SECRET", ··· 89 93 const dotenvPath = envPath ?? process.env.DOTENV_PATH ?? resolve(process.cwd(), ".env"); 90 94 loadDotEnv(dotenvPath); 91 95 92 - // Validate required variables 93 - const missing: string[] = []; 94 - for (const key of REQUIRED_KEYS) { 95 - if (!process.env[key]) { 96 - missing.push(key); 96 + // Validate required variables (legacy auth fields only required without OAuth) 97 + const oauthEnabled = process.env.OAUTH_ENABLED === "true"; 98 + if (!oauthEnabled) { 99 + const missing: string[] = []; 100 + for (const key of LEGACY_REQUIRED_KEYS) { 101 + if (!process.env[key]) { 102 + missing.push(key); 103 + } 97 104 } 98 - } 99 - if (missing.length > 0) { 100 - throw new Error( 101 - `Missing required environment variables: ${missing.join(", ")}`, 102 - ); 105 + if (missing.length > 0) { 106 + throw new Error( 107 + `Missing required environment variables: ${missing.join(", ")}`, 108 + ); 109 + } 103 110 } 104 111 105 - const pdsHostname = process.env.PDS_HOSTNAME!; 112 + const pdsHostname = process.env.PDS_HOSTNAME || undefined; 106 113 107 114 return { 108 115 DID: process.env.DID || undefined, 109 116 HANDLE: process.env.HANDLE || undefined, 110 117 PDS_HOSTNAME: pdsHostname, 111 - AUTH_TOKEN: process.env.AUTH_TOKEN!, 118 + AUTH_TOKEN: process.env.AUTH_TOKEN || randomBytes(32).toString("hex"), 112 119 SIGNING_KEY: process.env.SIGNING_KEY || undefined, 113 120 SIGNING_KEY_PUBLIC: process.env.SIGNING_KEY_PUBLIC || undefined, 114 - JWT_SECRET: process.env.JWT_SECRET!, 115 - PASSWORD_HASH: process.env.PASSWORD_HASH!, 121 + JWT_SECRET: process.env.JWT_SECRET || undefined, 122 + PASSWORD_HASH: process.env.PASSWORD_HASH || undefined, 116 123 EMAIL: process.env.EMAIL, 117 124 DATA_DIR: process.env.DATA_DIR ?? "./data", 118 125 PORT: parseInt(process.env.PORT ?? "3000", 10), ··· 131 138 RATE_LIMIT_MAX_CONNECTIONS: parseInt(process.env.RATE_LIMIT_MAX_CONNECTIONS ?? "100", 10), 132 139 RATE_LIMIT_FIREHOSE_PER_IP: parseInt(process.env.RATE_LIMIT_FIREHOSE_PER_IP ?? "3", 10), 133 140 OAUTH_ENABLED: process.env.OAUTH_ENABLED === "true", 141 + PUBLIC_URL: process.env.PUBLIC_URL || `http://localhost:${parseInt(process.env.PORT ?? "3000", 10)}`, 134 142 }; 135 143 } 136 144
+1 -1
src/didless-startup.test.ts
··· 47 47 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 48 48 RATE_LIMIT_MAX_CONNECTIONS: 100, 49 49 RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 - OAUTH_ENABLED: false, 50 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 51 51 }; 52 52 } 53 53
+20
src/index.ts
··· 161 161 }), 162 162 ); 163 163 164 + // Offer notification endpoint (unauthenticated, rate-limited) 165 + app.use( 166 + "/xrpc/org.p2pds.replication.notifyOffer", 167 + rateLimitMiddleware(rateLimiter, { 168 + pool: "notifyOffer", 169 + rule: { maxRequests: config.RATE_LIMIT_CHALLENGE_PER_MIN, windowMs: w }, 170 + }), 171 + ); 172 + 164 173 // App endpoints 165 174 const appRL = rateLimitMiddleware(rateLimiter, { 166 175 pool: "app", ··· 635 644 return c.json({ offers, agreements }); 636 645 }); 637 646 647 + // Incoming offer notification (unauthenticated — other nodes don't have our auth token) 648 + app.post("/xrpc/org.p2pds.replication.notifyOffer", (c) => 649 + app_routes.notifyOffer(c, getConfigDid(), replicationManager), 650 + ); 651 + 638 652 app.post("/xrpc/org.p2pds.replication.revokeOffer", requireAuth, async (c) => { 639 653 if (!replicationManager) { 640 654 return c.json({ error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 400); ··· 680 694 ); 681 695 app.post("/xrpc/org.p2pds.app.removeDid", requireAuth, (c) => 682 696 app_routes.removeDid(c, replicationManager), 697 + ); 698 + app.post("/xrpc/org.p2pds.app.acceptOffer", requireAuth, (c) => 699 + app_routes.acceptOffer(c, replicationManager), 700 + ); 701 + app.post("/xrpc/org.p2pds.app.rejectOffer", requireAuth, (c) => 702 + app_routes.rejectOffer(c, replicationManager), 683 703 ); 684 704 685 705 // ============================================
+6 -5
src/ipfs.test.ts
··· 51 51 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 52 52 RATE_LIMIT_MAX_CONNECTIONS: 100, 53 53 RATE_LIMIT_FIREHOSE_PER_IP: 3, 54 - OAUTH_ENABLED: false, 54 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 55 55 }; 56 56 } 57 57 ··· 61 61 62 62 describe("IpfsService", () => { 63 63 let tmpDir: string; 64 + let db: InstanceType<typeof Database>; 64 65 let service: IpfsService; 65 66 66 67 beforeEach(() => { 67 68 tmpDir = mkdtempSync(join(tmpdir(), "ipfs-test-")); 69 + db = new Database(join(tmpDir, "test.db")); 68 70 service = new IpfsService({ 69 - blocksPath: join(tmpDir, "blocks"), 70 - datastorePath: join(tmpDir, "datastore"), 71 + db, 71 72 networking: false, 72 73 }); 73 74 }); ··· 76 77 if (service.isRunning()) { 77 78 await service.stop(); 78 79 } 80 + db.close(); 79 81 rmSync(tmpDir, { recursive: true, force: true }); 80 82 }); 81 83 ··· 250 252 const firehose = new Firehose(repoManager); 251 253 252 254 ipfsService = new IpfsService({ 253 - blocksPath: join(tmpDir, "ipfs-blocks"), 254 - datastorePath: join(tmpDir, "ipfs-datastore"), 255 + db, 255 256 networking: false, 256 257 }); 257 258 await ipfsService.start();
+4 -2
src/oauth/routes.ts
··· 121 121 122 122 // Publish peer record on successful auth 123 123 try { 124 - await publishPeerRecord(pdsClientRef.current, networkService); 124 + await publishPeerRecord(pdsClientRef.current, networkService, config.PUBLIC_URL); 125 125 } catch (err) { 126 126 console.warn( 127 127 "[oauth] Failed to publish peer record:", ··· 135 135 onIdentityEstablished() 136 136 .then(async () => { 137 137 try { 138 - await publishPeerRecord(pdsClientRef.current!, networkService); 138 + await publishPeerRecord(pdsClientRef.current!, networkService, config.PUBLIC_URL); 139 139 } catch (err) { 140 140 console.warn( 141 141 "[oauth] Failed to re-publish peer record after IPFS start:", ··· 280 280 async function publishPeerRecord( 281 281 pdsClient: PdsClient, 282 282 networkService?: NetworkService, 283 + publicUrl?: string, 283 284 ): Promise<void> { 284 285 const peerId = networkService?.getPeerId() ?? null; 285 286 const multiaddrs = networkService?.getMultiaddrs() ?? []; ··· 288 289 $type: "org.p2pds.peer", 289 290 peerId, 290 291 multiaddrs, 292 + endpoint: publicUrl ?? null, 291 293 createdAt: new Date().toISOString(), 292 294 }); 293 295 }
+2 -4
src/replication/blob-replication.test.ts
··· 186 186 syncStorage.initSchema(); 187 187 188 188 ipfsService = new IpfsService({ 189 - blocksPath: join(tmpDir, "blocks"), 190 - datastorePath: join(tmpDir, "datastore"), 189 + db, 191 190 networking: false, 192 191 }); 193 192 await ipfsService.start(); ··· 252 251 syncStorage.initSchema(); 253 252 254 253 ipfsService = new IpfsService({ 255 - blocksPath: join(tmpDir, "blocks"), 256 - datastorePath: join(tmpDir, "datastore"), 254 + db, 257 255 networking: false, 258 256 }); 259 257 await ipfsService.start();
+2 -3
src/replication/challenge-response/challenge-response.test.ts
··· 43 43 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 44 44 RATE_LIMIT_MAX_CONNECTIONS: 100, 45 45 RATE_LIMIT_FIREHOSE_PER_IP: 3, 46 - OAUTH_ENABLED: false, 46 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 47 47 }; 48 48 } 49 49 ··· 59 59 60 60 db = new Database(join(tmpDir, "test.db")); 61 61 ipfsService = new IpfsService({ 62 - blocksPath: join(tmpDir, "ipfs-blocks"), 63 - datastorePath: join(tmpDir, "ipfs-datastore"), 62 + db, 64 63 networking: false, 65 64 }); 66 65 await ipfsService.start();
+18 -19
src/replication/challenge-response/e2e-challenge.test.ts
··· 11 11 import { join } from "node:path"; 12 12 import Database from "better-sqlite3"; 13 13 import type { Helia } from "@helia/interface"; 14 - import { FsBlockstore } from "blockstore-fs"; 15 - import { FsDatastore } from "datastore-fs"; 16 14 import type { Libp2p } from "@libp2p/interface"; 15 + import { SqliteBlockstore } from "../../sqlite-blockstore.js"; 16 + import { SqliteDatastore } from "../../sqlite-datastore.js"; 17 17 import { readCarWithRoot } from "@atproto/repo"; 18 18 19 19 import { IpfsService } from "../../ipfs.js"; ··· 51 51 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 52 52 RATE_LIMIT_MAX_CONNECTIONS: 100, 53 53 RATE_LIMIT_FIREHOSE_PER_IP: 3, 54 - OAUTH_ENABLED: false, 54 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 55 55 }; 56 56 } 57 57 ··· 59 59 * Create a minimal Helia node with TCP-only networking on localhost. 60 60 */ 61 61 async function createTestHeliaNode( 62 - blocksPath: string, 63 - datastorePath: string, 62 + db: Database.Database, 64 63 ): Promise<Helia> { 65 64 const { createHelia } = await import("helia"); 66 65 const { noise } = await import("@chainsafe/libp2p-noise"); ··· 71 70 const { libp2pRouting } = await import("@helia/routers"); 72 71 const { createLibp2p } = await import("libp2p"); 73 72 74 - const blockstore = new FsBlockstore(blocksPath); 75 - const datastore = new FsDatastore(datastorePath); 73 + const blockstore = new SqliteBlockstore(db); 74 + const datastore = new SqliteDatastore(db); 76 75 77 76 const libp2p = await createLibp2p({ 78 77 addresses: { ··· 88 87 89 88 const helia = await createHelia({ 90 89 libp2p, 91 - blockstore, 92 - datastore, 90 + blockstore: blockstore as any, 91 + datastore: datastore as any, 93 92 blockBrokers: [bitswap()], 94 93 routers: [libp2pRouting(libp2p)], 95 94 }); ··· 114 113 let tmpDir: string; 115 114 let nodeA: Helia | null = null; 116 115 let nodeB: Helia | null = null; 116 + let nodeDbA: Database.Database | null = null; 117 + let nodeDbB: Database.Database | null = null; 117 118 let transportA: Libp2pChallengeTransport | null = null; 118 119 let transportB: Libp2pChallengeTransport | null = null; 119 120 let db: InstanceType<typeof Database>; ··· 127 128 // Set up repo + local IpfsService (networking=false) for test data 128 129 db = new Database(join(tmpDir, "test.db")); 129 130 ipfsService = new IpfsService({ 130 - blocksPath: join(tmpDir, "ipfs-blocks"), 131 - datastorePath: join(tmpDir, "ipfs-datastore"), 131 + db, 132 132 networking: false, 133 133 }); 134 134 await ipfsService.start(); ··· 160 160 transportB = null; 161 161 nodeA = null; 162 162 nodeB = null; 163 + 164 + if (nodeDbA) { nodeDbA.close(); nodeDbA = null; } 165 + if (nodeDbB) { nodeDbB.close(); nodeDbB = null; } 163 166 164 167 if (ipfsService.isRunning()) { 165 168 await ipfsService.stop(); ··· 205 208 * Also copies all repo blocks to node A's blockstore so it can respond to challenges. 206 209 */ 207 210 async function setupNodes(): Promise<void> { 208 - nodeA = await createTestHeliaNode( 209 - join(tmpDir, "a-blocks"), 210 - join(tmpDir, "a-datastore"), 211 - ); 212 - nodeB = await createTestHeliaNode( 213 - join(tmpDir, "b-blocks"), 214 - join(tmpDir, "b-datastore"), 215 - ); 211 + nodeDbA = new Database(join(tmpDir, "node-a.db")); 212 + nodeDbB = new Database(join(tmpDir, "node-b.db")); 213 + nodeA = await createTestHeliaNode(nodeDbA); 214 + nodeB = await createTestHeliaNode(nodeDbB); 216 215 217 216 // Connect B -> A 218 217 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!);
+18 -19
src/replication/e2e-multi-node.test.ts
··· 17 17 18 18 import { IpfsService, commitTopic, type CommitNotification } from "../ipfs.js"; 19 19 import type { BlockStore } from "../ipfs.js"; 20 + import { SqliteBlockstore } from "../sqlite-blockstore.js"; 21 + import { SqliteDatastore } from "../sqlite-datastore.js"; 20 22 import { RepoManager } from "../repo-manager.js"; 21 23 import type { Config } from "../config.js"; 22 24 import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js"; ··· 56 58 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 57 59 RATE_LIMIT_MAX_CONNECTIONS: 100, 58 60 RATE_LIMIT_FIREHOSE_PER_IP: 3, 59 - OAUTH_ENABLED: false, 61 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 60 62 }; 61 63 } 62 64 ··· 65 67 * Supports both gossipsub pub/sub and custom protocol streams. 66 68 */ 67 69 async function createGossipsubChallengeNode( 68 - blocksPath: string, 69 - datastorePath: string, 70 + db: Database.Database, 70 71 ): Promise<Helia> { 71 72 const { createHelia } = await import("helia"); 72 73 const { noise } = await import("@chainsafe/libp2p-noise"); ··· 75 76 const { identify } = await import("@libp2p/identify"); 76 77 const { gossipsub } = await import("@libp2p/gossipsub"); 77 78 const { createLibp2p } = await import("libp2p"); 78 - const { FsBlockstore } = await import("blockstore-fs"); 79 - const { FsDatastore } = await import("datastore-fs"); 80 79 81 - const blockstore = new FsBlockstore(blocksPath); 82 - const datastore = new FsDatastore(datastorePath); 80 + const blockstore = new SqliteBlockstore(db); 81 + const datastore = new SqliteDatastore(db); 83 82 84 83 const libp2p = await createLibp2p({ 85 84 addresses: { ··· 99 98 100 99 return createHelia({ 101 100 libp2p, 102 - blockstore, 103 - datastore, 101 + blockstore: blockstore as any, 102 + datastore: datastore as any, 104 103 }); 105 104 } 106 105 ··· 176 175 let repoManager: RepoManager; 177 176 let nodeA: Helia | null = null; 178 177 let nodeB: Helia | null = null; 178 + let nodeDbA: Database.Database | null = null; 179 + let nodeDbB: Database.Database | null = null; 179 180 let transportA: Libp2pChallengeTransport | null = null; 180 181 let transportB: Libp2pChallengeTransport | null = null; 181 182 ··· 185 186 186 187 db = new Database(join(tmpDir, "test.db")); 187 188 ipfsService = new IpfsService({ 188 - blocksPath: join(tmpDir, "ipfs-blocks"), 189 - datastorePath: join(tmpDir, "ipfs-datastore"), 189 + db, 190 190 networking: false, 191 191 }); 192 192 await ipfsService.start(); ··· 219 219 nodeA = null; 220 220 nodeB = null; 221 221 222 + if (nodeDbA) { nodeDbA.close(); nodeDbA = null; } 223 + if (nodeDbB) { nodeDbB.close(); nodeDbB = null; } 224 + 222 225 if (ipfsService.isRunning()) { 223 226 await ipfsService.stop(); 224 227 } ··· 248 251 * and create Libp2pChallengeTransport on both. 249 252 */ 250 253 async function setupConnectedNodes(): Promise<void> { 251 - nodeA = await createGossipsubChallengeNode( 252 - join(tmpDir, "a-blocks"), 253 - join(tmpDir, "a-datastore"), 254 - ); 255 - nodeB = await createGossipsubChallengeNode( 256 - join(tmpDir, "b-blocks"), 257 - join(tmpDir, "b-datastore"), 258 - ); 254 + nodeDbA = new Database(join(tmpDir, "node-a.db")); 255 + nodeDbB = new Database(join(tmpDir, "node-b.db")); 256 + nodeA = await createGossipsubChallengeNode(nodeDbA); 257 + nodeB = await createGossipsubChallengeNode(nodeDbB); 259 258 260 259 // Connect B -> A 261 260 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!);
+35 -44
src/replication/e2e-networking.test.ts
··· 12 12 import { mkdtempSync, rmSync } from "node:fs"; 13 13 import { tmpdir } from "node:os"; 14 14 import { join } from "node:path"; 15 + import Database from "better-sqlite3"; 15 16 import { CID } from "multiformats"; 16 17 // @ts-ignore -- multiformats v9 subpath exports lack type declarations 17 18 import * as raw from "multiformats/codecs/raw"; 18 19 // @ts-ignore -- multiformats v9 subpath exports lack type declarations 19 20 import { sha256 } from "multiformats/hashes/sha2"; 20 21 import type { Helia } from "@helia/interface"; 21 - import { FsBlockstore } from "blockstore-fs"; 22 - import { FsDatastore } from "datastore-fs"; 22 + import { SqliteBlockstore } from "../sqlite-blockstore.js"; 23 + import { SqliteDatastore } from "../sqlite-datastore.js"; 23 24 24 25 /** 25 26 * Create a CID from raw bytes using SHA-256. ··· 66 67 * network dependencies. Nodes must be connected manually via dial(). 67 68 */ 68 69 async function createTestHeliaNode( 69 - blocksPath: string, 70 - datastorePath: string, 70 + db: Database.Database, 71 71 ): Promise<Helia> { 72 72 const { createHelia } = await import("helia"); 73 73 const { noise } = await import("@chainsafe/libp2p-noise"); ··· 78 78 const { libp2pRouting } = await import("@helia/routers"); 79 79 const { createLibp2p } = await import("libp2p"); 80 80 81 - const blockstore = new FsBlockstore(blocksPath); 82 - const datastore = new FsDatastore(datastorePath); 81 + const blockstore = new SqliteBlockstore(db); 82 + const datastore = new SqliteDatastore(db); 83 83 84 84 const libp2p = await createLibp2p({ 85 85 addresses: { ··· 96 96 97 97 const helia = await createHelia({ 98 98 libp2p, 99 - blockstore, 100 - datastore, 99 + blockstore: blockstore as any, 100 + datastore: datastore as any, 101 101 blockBrokers: [bitswap()], 102 102 routers: [libp2pRouting(libp2p)], 103 103 }); ··· 125 125 let tmpDir: string; 126 126 let nodeA: Helia | null = null; 127 127 let nodeB: Helia | null = null; 128 + let dbA: Database.Database | null = null; 129 + let dbB: Database.Database | null = null; 128 130 129 131 beforeEach(() => { 130 132 tmpDir = mkdtempSync(join(tmpdir(), "e2e-networking-test-")); ··· 139 141 nodeA = null; 140 142 nodeB = null; 141 143 144 + if (dbA) { dbA.close(); dbA = null; } 145 + if (dbB) { dbB.close(); dbB = null; } 146 + 142 147 rmSync(tmpDir, { recursive: true, force: true }); 143 148 }); 144 149 145 150 it("nodes can connect and exchange blocks via bitswap", { timeout: 60_000 }, async () => { 146 151 // 1. Create two Helia nodes with real TCP networking on localhost 147 - nodeA = await createTestHeliaNode( 148 - join(tmpDir, "a-blocks"), 149 - join(tmpDir, "a-datastore"), 150 - ); 151 - nodeB = await createTestHeliaNode( 152 - join(tmpDir, "b-blocks"), 153 - join(tmpDir, "b-datastore"), 154 - ); 152 + dbA = new Database(join(tmpDir, "node-a.db")); 153 + dbB = new Database(join(tmpDir, "node-b.db")); 154 + nodeA = await createTestHeliaNode(dbA); 155 + nodeB = await createTestHeliaNode(dbB); 155 156 156 157 // 2. Verify both nodes are running and have addresses 157 158 const addrsA = nodeA.libp2p.getMultiaddrs(); ··· 230 231 // exposes correct peer identity and multiaddr information. 231 232 const { IpfsService } = await import("../ipfs.js"); 232 233 234 + const svcDbA = new Database(join(tmpDir, "svc-a.db")); 235 + const svcDbB = new Database(join(tmpDir, "svc-b.db")); 233 236 const serviceA = new IpfsService({ 234 - blocksPath: join(tmpDir, "svc-a-blocks"), 235 - datastorePath: join(tmpDir, "svc-a-datastore"), 237 + db: svcDbA, 236 238 networking: true, 237 239 }); 238 240 const serviceB = new IpfsService({ 239 - blocksPath: join(tmpDir, "svc-b-blocks"), 240 - datastorePath: join(tmpDir, "svc-b-datastore"), 241 + db: svcDbB, 241 242 networking: true, 242 243 }); 243 244 ··· 264 265 } finally { 265 266 if (serviceA.isRunning()) await serviceA.stop(); 266 267 if (serviceB.isRunning()) await serviceB.stop(); 268 + svcDbA.close(); 269 + svcDbB.close(); 267 270 } 268 271 }); 269 272 270 273 it("block stored on one node is retrievable from the other after connection", { timeout: 60_000 }, async () => { 271 274 // A focused test: one block, two nodes, verify bitswap fetch. 272 - nodeA = await createTestHeliaNode( 273 - join(tmpDir, "single-a-blocks"), 274 - join(tmpDir, "single-a-datastore"), 275 - ); 276 - nodeB = await createTestHeliaNode( 277 - join(tmpDir, "single-b-blocks"), 278 - join(tmpDir, "single-b-datastore"), 279 - ); 275 + dbA = new Database(join(tmpDir, "single-a.db")); 276 + dbB = new Database(join(tmpDir, "single-b.db")); 277 + nodeA = await createTestHeliaNode(dbA); 278 + nodeB = await createTestHeliaNode(dbB); 280 279 281 280 // Connect 282 281 const addrsA = nodeA.libp2p.getMultiaddrs(); ··· 309 308 }); 310 309 311 310 it("nodes discover each other's peer IDs after connection", { timeout: 30_000 }, async () => { 312 - nodeA = await createTestHeliaNode( 313 - join(tmpDir, "disc-a-blocks"), 314 - join(tmpDir, "disc-a-datastore"), 315 - ); 316 - nodeB = await createTestHeliaNode( 317 - join(tmpDir, "disc-b-blocks"), 318 - join(tmpDir, "disc-b-datastore"), 319 - ); 311 + dbA = new Database(join(tmpDir, "disc-a.db")); 312 + dbB = new Database(join(tmpDir, "disc-b.db")); 313 + nodeA = await createTestHeliaNode(dbA); 314 + nodeB = await createTestHeliaNode(dbB); 320 315 321 316 const peerIdA = nodeA.libp2p.peerId; 322 317 const peerIdB = nodeB.libp2p.peerId; ··· 344 339 }); 345 340 346 341 it("bidirectional block exchange works", { timeout: 60_000 }, async () => { 347 - nodeA = await createTestHeliaNode( 348 - join(tmpDir, "bidir-a-blocks"), 349 - join(tmpDir, "bidir-a-datastore"), 350 - ); 351 - nodeB = await createTestHeliaNode( 352 - join(tmpDir, "bidir-b-blocks"), 353 - join(tmpDir, "bidir-b-datastore"), 354 - ); 342 + dbA = new Database(join(tmpDir, "bidir-a.db")); 343 + dbB = new Database(join(tmpDir, "bidir-b.db")); 344 + nodeA = await createTestHeliaNode(dbA); 345 + nodeB = await createTestHeliaNode(dbB); 355 346 356 347 // Connect 357 348 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!);
+3 -5
src/replication/e2e-sync.test.ts
··· 52 52 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 53 53 RATE_LIMIT_MAX_CONNECTIONS: 100, 54 54 RATE_LIMIT_FIREHOSE_PER_IP: 3, 55 - OAUTH_ENABLED: false, 55 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 56 56 }; 57 57 } 58 58 ··· 104 104 repoManager.init(); 105 105 106 106 ipfsService = new IpfsService({ 107 - blocksPath: join(tmpDir, "blocks"), 108 - datastorePath: join(tmpDir, "datastore"), 107 + db, 109 108 networking: false, 110 109 }); 111 110 await ipfsService.start(); ··· 213 212 repoManager = new RepoManager(db, config); 214 213 repoManager.init(); 215 214 ipfsService = new IpfsService({ 216 - blocksPath: join(tmpDir, "blocks"), 217 - datastorePath: join(tmpDir, "datastore"), 215 + db, 218 216 networking: false, 219 217 }); 220 218 await ipfsService.start();
+4 -7
src/replication/firehose-incremental.test.ts
··· 70 70 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 71 71 RATE_LIMIT_MAX_CONNECTIONS: 100, 72 72 RATE_LIMIT_FIREHOSE_PER_IP: 3, 73 - OAUTH_ENABLED: false, 73 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 74 74 }; 75 75 } 76 76 ··· 168 168 169 169 db = new Database(join(tmpDir, "test.db")); 170 170 ipfsService = new IpfsService({ 171 - blocksPath: join(tmpDir, "ipfs-blocks"), 172 - datastorePath: join(tmpDir, "ipfs-datastore"), 171 + db, 173 172 networking: false, 174 173 }); 175 174 await ipfsService.start(); ··· 502 501 sourceConfig.DID = sourceDid; 503 502 sourceDb = new Database(join(tmpDir, "source.db")); 504 503 sourceIpfs = new IpfsService({ 505 - blocksPath: join(tmpDir, "source-ipfs-blocks"), 506 - datastorePath: join(tmpDir, "source-ipfs-datastore"), 504 + db: sourceDb, 507 505 networking: false, 508 506 }); 509 507 await sourceIpfs.start(); ··· 517 515 "0000000000000000000000000000000000000000000000000000000000000002"; 518 516 replicaDb = new Database(join(tmpDir, "replica.db")); 519 517 replicaIpfs = new IpfsService({ 520 - blocksPath: join(tmpDir, "replica-ipfs-blocks"), 521 - datastorePath: join(tmpDir, "replica-ipfs-datastore"), 518 + db: replicaDb, 522 519 networking: false, 523 520 }); 524 521 await replicaIpfs.start();
+21 -19
src/replication/gossipsub-notifications.test.ts
··· 9 9 import { mkdtempSync, rmSync } from "node:fs"; 10 10 import { tmpdir } from "node:os"; 11 11 import { join } from "node:path"; 12 + import Database from "better-sqlite3"; 12 13 import type { Helia } from "@helia/interface"; 14 + import { SqliteBlockstore } from "../sqlite-blockstore.js"; 15 + import { SqliteDatastore } from "../sqlite-datastore.js"; 13 16 import { 14 17 encode as cborEncode, 15 18 decode as cborDecode, ··· 68 71 let tmpDir: string; 69 72 let nodeA: Helia | null = null; 70 73 let nodeB: Helia | null = null; 74 + let dbA: Database.Database | null = null; 75 + let dbB: Database.Database | null = null; 71 76 72 77 beforeEach(() => { 73 78 tmpDir = mkdtempSync(join(tmpdir(), "gossipsub-e2e-test-")); ··· 80 85 await Promise.all(stops); 81 86 nodeA = null; 82 87 nodeB = null; 88 + 89 + if (dbA) { dbA.close(); dbA = null; } 90 + if (dbB) { dbB.close(); dbB = null; } 91 + 83 92 rmSync(tmpDir, { recursive: true, force: true }); 84 93 }); 85 94 ··· 88 97 * Strips out all discovery, relay, etc. — just TCP + noise + yamux + identify + gossipsub. 89 98 */ 90 99 async function createGossipsubTestNode( 91 - blocksPath: string, 92 - datastorePath: string, 100 + db: Database.Database, 93 101 ): Promise<Helia> { 94 102 const { createHelia } = await import("helia"); 95 103 const { noise } = await import("@chainsafe/libp2p-noise"); ··· 98 106 const { identify } = await import("@libp2p/identify"); 99 107 const { gossipsub } = await import("@libp2p/gossipsub"); 100 108 const { createLibp2p } = await import("libp2p"); 101 - const { FsBlockstore } = await import("blockstore-fs"); 102 - const { FsDatastore } = await import("datastore-fs"); 103 109 104 - const blockstore = new FsBlockstore(blocksPath); 105 - const datastore = new FsDatastore(datastorePath); 110 + const blockstore = new SqliteBlockstore(db); 111 + const datastore = new SqliteDatastore(db); 106 112 107 113 const libp2p = await createLibp2p({ 108 114 addresses: { ··· 122 128 123 129 return createHelia({ 124 130 libp2p, 125 - blockstore, 126 - datastore, 131 + blockstore: blockstore as any, 132 + datastore: datastore as any, 127 133 }); 128 134 } 129 135 130 136 it("notification published by one node is received by connected peer", { timeout: 60_000 }, async () => { 131 - nodeA = await createGossipsubTestNode( 132 - join(tmpDir, "a-blocks"), 133 - join(tmpDir, "a-datastore"), 134 - ); 135 - nodeB = await createGossipsubTestNode( 136 - join(tmpDir, "b-blocks"), 137 - join(tmpDir, "b-datastore"), 138 - ); 137 + dbA = new Database(join(tmpDir, "node-a.db")); 138 + dbB = new Database(join(tmpDir, "node-b.db")); 139 + nodeA = await createGossipsubTestNode(dbA); 140 + nodeB = await createGossipsubTestNode(dbB); 139 141 140 142 // Connect the nodes 141 143 const addrsA = nodeA.libp2p.getMultiaddrs(); ··· 305 307 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 306 308 RATE_LIMIT_MAX_CONNECTIONS: 100, 307 309 RATE_LIMIT_FIREHOSE_PER_IP: 3, 308 - OAUTH_ENABLED: false, 310 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 309 311 }; 310 312 311 313 const { RepoManager } = await import("../repo-manager.js"); ··· 372 374 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 373 375 RATE_LIMIT_MAX_CONNECTIONS: 100, 374 376 RATE_LIMIT_FIREHOSE_PER_IP: 3, 375 - OAUTH_ENABLED: false, 377 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 376 378 }; 377 379 378 380 const { RepoManager } = await import("../repo-manager.js"); ··· 461 463 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 462 464 RATE_LIMIT_MAX_CONNECTIONS: 100, 463 465 RATE_LIMIT_FIREHOSE_PER_IP: 3, 464 - OAUTH_ENABLED: false, 466 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 465 467 }; 466 468 467 469 const { RepoManager } = await import("../repo-manager.js");
+2 -3
src/replication/incremental-sync.test.ts
··· 44 44 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 45 45 RATE_LIMIT_MAX_CONNECTIONS: 100, 46 46 RATE_LIMIT_FIREHOSE_PER_IP: 3, 47 - OAUTH_ENABLED: false, 47 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 48 48 }; 49 49 } 50 50 ··· 168 168 169 169 db = new Database(join(tmpDir, "test.db")); 170 170 ipfsService = new IpfsService({ 171 - blocksPath: join(tmpDir, "ipfs-blocks"), 172 - datastorePath: join(tmpDir, "ipfs-datastore"), 171 + db, 173 172 networking: false, 174 173 }); 175 174 await ipfsService.start();
+2 -3
src/replication/mst-proof.test.ts
··· 36 36 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 37 37 RATE_LIMIT_MAX_CONNECTIONS: 100, 38 38 RATE_LIMIT_FIREHOSE_PER_IP: 3, 39 - OAUTH_ENABLED: false, 39 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 40 40 }; 41 41 } 42 42 ··· 52 52 53 53 db = new Database(join(tmpDir, "test.db")); 54 54 ipfsService = new IpfsService({ 55 - blocksPath: join(tmpDir, "ipfs-blocks"), 56 - datastorePath: join(tmpDir, "ipfs-datastore"), 55 + db, 57 56 networking: false, 58 57 }); 59 58 await ipfsService.start();
+3 -5
src/replication/offer-manager.test.ts
··· 38 38 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 39 39 RATE_LIMIT_MAX_CONNECTIONS: 100, 40 40 RATE_LIMIT_FIREHOSE_PER_IP: 3, 41 - OAUTH_ENABLED: false, 41 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 42 42 }; 43 43 } 44 44 ··· 73 73 74 74 db = new Database(join(tmpDir, "test.db")); 75 75 ipfsService = new IpfsService({ 76 - blocksPath: join(tmpDir, "ipfs-blocks"), 77 - datastorePath: join(tmpDir, "ipfs-datastore"), 76 + db, 78 77 networking: false, 79 78 }); 80 79 await ipfsService.start(); ··· 401 400 const config2 = testConfig(tmpDir2, PEER_A_DID); 402 401 const db2 = new Database(join(tmpDir2, "test.db")); 403 402 const ipfsService2 = new IpfsService({ 404 - blocksPath: join(tmpDir2, "ipfs-blocks"), 405 - datastorePath: join(tmpDir2, "ipfs-datastore"), 403 + db: db2, 406 404 networking: false, 407 405 }); 408 406 await ipfsService2.start();
+7
src/replication/peer-discovery.ts
··· 9 9 pdsEndpoint: string; 10 10 peerId: string | null; 11 11 multiaddrs: string[]; 12 + /** The peer's p2pds HTTP endpoint URL (from org.p2pds.peer record). */ 13 + endpoint: string | null; 12 14 } 13 15 14 16 export class PeerDiscovery { ··· 36 38 pdsEndpoint, 37 39 peerId: null, 38 40 multiaddrs: [], 41 + endpoint: null, 39 42 }; 40 43 } 41 44 ··· 49 52 multiaddrs: Array.isArray(peerRecord.multiaddrs) 50 53 ? (peerRecord.multiaddrs as string[]) 51 54 : [], 55 + endpoint: 56 + typeof peerRecord.endpoint === "string" 57 + ? peerRecord.endpoint 58 + : null, 52 59 }; 53 60 } 54 61
+4 -1
src/replication/peer-freshness.test.ts
··· 52 52 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 53 53 RATE_LIMIT_MAX_CONNECTIONS: 100, 54 54 RATE_LIMIT_FIREHOSE_PER_IP: 3, 55 - OAUTH_ENABLED: false, 55 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 56 56 }; 57 57 } 58 58 ··· 214 214 pdsEndpoint: "https://pds.example.com", 215 215 peerId: "12D3KooWNewPeer", 216 216 multiaddrs: ["/ip4/5.6.7.8/tcp/4001"], 217 + endpoint: null, 217 218 }); 218 219 219 220 // Mock repo fetcher ··· 296 297 pdsEndpoint: "https://pds1.example.com", 297 298 peerId: "12D3KooWNewPeer1", 298 299 multiaddrs: ["/ip4/10.0.0.1/tcp/4001"], 300 + endpoint: null, 299 301 }); 300 302 301 303 // Refresh for pds1 endpoint only ··· 393 395 pdsEndpoint: "https://pds.example.com", 394 396 peerId: "12D3KooWPeerX", 395 397 multiaddrs: ["/ip4/1.2.3.4/tcp/4001"], 398 + endpoint: null, 396 399 }); 397 400 398 401 // Active connection has an additional observed addr
+7 -13
src/replication/policy-integration.test.ts
··· 60 60 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 61 61 RATE_LIMIT_MAX_CONNECTIONS: 100, 62 62 RATE_LIMIT_FIREHOSE_PER_IP: 3, 63 - OAUTH_ENABLED: false, 63 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 64 64 }; 65 65 } 66 66 ··· 96 96 tmpDir = mkdtempSync(join(tmpdir(), "policy-integration-test-")); 97 97 db = new Database(join(tmpDir, "test.db")); 98 98 ipfsService = new IpfsService({ 99 - blocksPath: join(tmpDir, "ipfs-blocks"), 100 - datastorePath: join(tmpDir, "ipfs-datastore"), 99 + db, 101 100 networking: false, 102 101 }); 103 102 await ipfsService.start(); ··· 311 310 tmpDir = mkdtempSync(join(tmpdir(), "policy-interval-test-")); 312 311 db = new Database(join(tmpDir, "test.db")); 313 312 ipfsService = new IpfsService({ 314 - blocksPath: join(tmpDir, "ipfs-blocks"), 315 - datastorePath: join(tmpDir, "ipfs-datastore"), 313 + db, 316 314 networking: false, 317 315 }); 318 316 await ipfsService.start(); ··· 459 457 tmpDir = mkdtempSync(join(tmpdir(), "policy-priority-test-")); 460 458 db = new Database(join(tmpDir, "test.db")); 461 459 ipfsService = new IpfsService({ 462 - blocksPath: join(tmpDir, "ipfs-blocks"), 463 - datastorePath: join(tmpDir, "ipfs-datastore"), 460 + db, 464 461 networking: false, 465 462 }); 466 463 await ipfsService.start(); ··· 579 576 tmpDir = mkdtempSync(join(tmpdir(), "policy-filter-test-")); 580 577 db = new Database(join(tmpDir, "test.db")); 581 578 ipfsService = new IpfsService({ 582 - blocksPath: join(tmpDir, "ipfs-blocks"), 583 - datastorePath: join(tmpDir, "ipfs-datastore"), 579 + db, 584 580 networking: false, 585 581 }); 586 582 await ipfsService.start(); ··· 707 703 tmpDir = mkdtempSync(join(tmpdir(), "policy-preset-test-")); 708 704 db = new Database(join(tmpDir, "test.db")); 709 705 ipfsService = new IpfsService({ 710 - blocksPath: join(tmpDir, "ipfs-blocks"), 711 - datastorePath: join(tmpDir, "ipfs-datastore"), 706 + db, 712 707 networking: false, 713 708 }); 714 709 await ipfsService.start(); ··· 864 859 tmpDir = mkdtempSync(join(tmpdir(), "policy-compat-test-")); 865 860 db = new Database(join(tmpDir, "test.db")); 866 861 ipfsService = new IpfsService({ 867 - blocksPath: join(tmpDir, "ipfs-blocks"), 868 - datastorePath: join(tmpDir, "ipfs-datastore"), 862 + db, 869 863 networking: false, 870 864 }); 871 865 await ipfsService.start();
+117
src/replication/replication-manager.ts
··· 282 282 // Store in offered_dids (does NOT create replication_state or trigger sync) 283 283 this.syncStorage.addOfferedDid(did, pdsEndpoint ?? null); 284 284 285 + // Push notification: tell the target node about this offer (fire-and-forget) 286 + this.pushOfferNotification(did).catch((err) => { 287 + console.warn( 288 + `[replication] Failed to push offer notification to ${did}:`, 289 + err instanceof Error ? err.message : String(err), 290 + ); 291 + }); 292 + 285 293 return { status: "offered" }; 286 294 } 287 295 ··· 300 308 301 309 this.syncStorage.removeOfferedDid(did); 302 310 return { status: "removed" }; 311 + } 312 + 313 + /** 314 + * Push an offer notification to the target node's p2pds endpoint. 315 + * Resolves the target's org.p2pds.peer record to find their endpoint URL, 316 + * then POSTs to their notifyOffer XRPC method. 317 + */ 318 + private async pushOfferNotification(targetDid: string): Promise<void> { 319 + const peerInfo = await this.peerDiscovery.discoverPeer(targetDid); 320 + if (!peerInfo?.endpoint) { 321 + console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping push notification`); 322 + return; 323 + } 324 + 325 + const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyOffer`; 326 + const body = { 327 + offererDid: this.config.DID, 328 + subjectDid: targetDid, 329 + offererPdsEndpoint: await this.repoFetcher.resolvePds(this.config.DID ?? ""), 330 + params: { 331 + minCopies: 2, 332 + intervalSec: 600, 333 + priority: 50, 334 + }, 335 + }; 336 + 337 + const res = await fetch(url, { 338 + method: "POST", 339 + headers: { "Content-Type": "application/json" }, 340 + body: JSON.stringify(body), 341 + }); 342 + 343 + if (!res.ok) { 344 + const text = await res.text().catch(() => ""); 345 + console.warn(`[replication] Push notification to ${targetDid} failed (${res.status}): ${text}`); 346 + } else { 347 + console.log(`[replication] Push notification sent to ${targetDid} at ${peerInfo.endpoint}`); 348 + } 349 + } 350 + 351 + /** 352 + * Accept an incoming offer: create a reciprocal offer and remove from incoming_offers. 353 + */ 354 + async acceptOffer(offererDid: string): Promise<{ status: "accepted" | "not_found" | "error"; error?: string }> { 355 + // Find the incoming offer to get the subject DID 356 + const offers = this.syncStorage.getIncomingOffers(); 357 + const offer = offers.find((o) => o.offererDid === offererDid); 358 + if (!offer) { 359 + return { status: "not_found" }; 360 + } 361 + 362 + // Create reciprocal offer (which also push-notifies back) 363 + try { 364 + await this.offerDid(offererDid); 365 + } catch (err) { 366 + return { 367 + status: "error", 368 + error: `Failed to create reciprocal offer: ${err instanceof Error ? err.message : String(err)}`, 369 + }; 370 + } 371 + 372 + // Remove from incoming_offers 373 + this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid); 374 + 375 + return { status: "accepted" }; 376 + } 377 + 378 + /** 379 + * Reject an incoming offer: remove from incoming_offers without creating a reciprocal offer. 380 + */ 381 + rejectOffer(offererDid: string): { status: "rejected" | "not_found" } { 382 + const offers = this.syncStorage.getIncomingOffers(); 383 + const offer = offers.find((o) => o.offererDid === offererDid); 384 + if (!offer) { 385 + return { status: "not_found" }; 386 + } 387 + 388 + this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid); 389 + return { status: "rejected" }; 390 + } 391 + 392 + /** 393 + * Get all incoming offers (from other nodes wanting to replicate our data). 394 + */ 395 + getIncomingOffers(): Array<{ 396 + offererDid: string; 397 + subjectDid: string; 398 + offererPdsEndpoint: string | null; 399 + offererEndpoint: string | null; 400 + minCopies: number; 401 + intervalSec: number; 402 + priority: number; 403 + receivedAt: string; 404 + }> { 405 + return this.syncStorage.getIncomingOffers(); 406 + } 407 + 408 + /** 409 + * Get the PeerDiscovery instance (for verifying incoming offers). 410 + */ 411 + getPeerDiscovery(): PeerDiscovery { 412 + return this.peerDiscovery; 413 + } 414 + 415 + /** 416 + * Get the RepoFetcher instance (for resolving PDS endpoints). 417 + */ 418 + getRepoFetcher(): RepoFetcher { 419 + return this.repoFetcher; 303 420 } 304 421 305 422 /**
+25 -31
src/replication/replication.test.ts
··· 63 63 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 64 64 RATE_LIMIT_MAX_CONNECTIONS: 100, 65 65 RATE_LIMIT_FIREHOSE_PER_IP: 3, 66 - OAUTH_ENABLED: false, 66 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 67 67 }; 68 68 } 69 69 ··· 288 288 289 289 db = new Database(join(tmpDir, "test.db")); 290 290 ipfsService = new IpfsService({ 291 - blocksPath: join(tmpDir, "ipfs-blocks"), 292 - datastorePath: join(tmpDir, "ipfs-datastore"), 291 + db, 293 292 networking: false, 294 293 }); 295 294 await ipfsService.start(); ··· 505 504 506 505 describe("BlockVerifier", () => { 507 506 let tmpDir: string; 507 + let db: InstanceType<typeof Database>; 508 508 let ipfsService: IpfsService; 509 509 510 510 beforeEach(async () => { 511 511 tmpDir = mkdtempSync(join(tmpdir(), "verifier-test-")); 512 + db = new Database(join(tmpDir, "test.db")); 512 513 ipfsService = new IpfsService({ 513 - blocksPath: join(tmpDir, "blocks"), 514 - datastorePath: join(tmpDir, "datastore"), 514 + db, 515 515 networking: false, 516 516 }); 517 517 await ipfsService.start(); ··· 521 521 if (ipfsService.isRunning()) { 522 522 await ipfsService.stop(); 523 523 } 524 + db.close(); 524 525 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 525 526 }); 526 527 ··· 603 604 const sourceConfig = testConfig(join(tmpDir, "source"), []); 604 605 sourceDb = new Database(join(tmpDir, "source.db")); 605 606 sourceIpfs = new IpfsService({ 606 - blocksPath: join(tmpDir, "source-ipfs-blocks"), 607 - datastorePath: join(tmpDir, "source-ipfs-datastore"), 607 + db: sourceDb, 608 608 networking: false, 609 609 }); 610 610 await sourceIpfs.start(); ··· 620 620 "0000000000000000000000000000000000000000000000000000000000000002"; 621 621 replicaDb = new Database(join(tmpDir, "replica.db")); 622 622 replicaIpfs = new IpfsService({ 623 - blocksPath: join(tmpDir, "replica-ipfs-blocks"), 624 - datastorePath: join(tmpDir, "replica-ipfs-datastore"), 623 + db: replicaDb, 625 624 networking: false, 626 625 }); 627 626 await replicaIpfs.start(); ··· 841 840 842 841 describe("RemoteVerifier", () => { 843 842 let tmpDir: string; 843 + let db: InstanceType<typeof Database>; 844 844 let ipfsService: IpfsService; 845 845 846 846 beforeEach(async () => { 847 847 tmpDir = mkdtempSync(join(tmpdir(), "remote-verifier-test-")); 848 + db = new Database(join(tmpDir, "test.db")); 848 849 ipfsService = new IpfsService({ 849 - blocksPath: join(tmpDir, "blocks"), 850 - datastorePath: join(tmpDir, "datastore"), 850 + db, 851 851 networking: false, 852 852 }); 853 853 await ipfsService.start(); ··· 857 857 if (ipfsService.isRunning()) { 858 858 await ipfsService.stop(); 859 859 } 860 + db.close(); 860 861 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 861 862 }); 862 863 ··· 1216 1217 1217 1218 describe("IpfsReadableBlockstore", () => { 1218 1219 let tmpDir: string; 1220 + let db: InstanceType<typeof Database>; 1219 1221 let ipfsService: IpfsService; 1220 1222 let readableBlockstore: IpfsReadableBlockstore; 1221 1223 1222 1224 beforeEach(async () => { 1223 1225 tmpDir = mkdtempSync(join(tmpdir(), "ipfs-readable-bs-test-")); 1226 + db = new Database(join(tmpDir, "test.db")); 1224 1227 ipfsService = new IpfsService({ 1225 - blocksPath: join(tmpDir, "blocks"), 1226 - datastorePath: join(tmpDir, "datastore"), 1228 + db, 1227 1229 networking: false, 1228 1230 }); 1229 1231 await ipfsService.start(); ··· 1232 1234 1233 1235 afterEach(async () => { 1234 1236 if (ipfsService.isRunning()) await ipfsService.stop(); 1237 + db.close(); 1235 1238 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1236 1239 }); 1237 1240 ··· 1306 1309 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1307 1310 sourceDb = new Database(join(tmpDir, "source.db")); 1308 1311 sourceIpfs = new IpfsService({ 1309 - blocksPath: join(tmpDir, "source-ipfs-blocks"), 1310 - datastorePath: join(tmpDir, "source-ipfs-datastore"), 1312 + db: sourceDb, 1311 1313 networking: false, 1312 1314 }); 1313 1315 await sourceIpfs.start(); ··· 1317 1319 // Replica IPFS + sync storage 1318 1320 replicaDb = new Database(join(tmpDir, "replica.db")); 1319 1321 replicaIpfs = new IpfsService({ 1320 - blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1321 - datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1322 + db: replicaDb, 1322 1323 networking: false, 1323 1324 }); 1324 1325 await replicaIpfs.start(); ··· 1534 1535 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1535 1536 sourceDb = new Database(join(tmpDir, "source.db")); 1536 1537 sourceIpfs = new IpfsService({ 1537 - blocksPath: join(tmpDir, "source-ipfs-blocks"), 1538 - datastorePath: join(tmpDir, "source-ipfs-datastore"), 1538 + db: sourceDb, 1539 1539 networking: false, 1540 1540 }); 1541 1541 await sourceIpfs.start(); ··· 1549 1549 "0000000000000000000000000000000000000000000000000000000000000002"; 1550 1550 replicaDb = new Database(join(tmpDir, "replica.db")); 1551 1551 replicaIpfs = new IpfsService({ 1552 - blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1553 - datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1552 + db: replicaDb, 1554 1553 networking: false, 1555 1554 }); 1556 1555 await replicaIpfs.start(); ··· 1741 1740 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1742 1741 sourceDb = new Database(join(tmpDir, "source.db")); 1743 1742 sourceIpfs = new IpfsService({ 1744 - blocksPath: join(tmpDir, "source-ipfs-blocks"), 1745 - datastorePath: join(tmpDir, "source-ipfs-datastore"), 1743 + db: sourceDb, 1746 1744 networking: false, 1747 1745 }); 1748 1746 await sourceIpfs.start(); ··· 1751 1749 1752 1750 replicaDb = new Database(join(tmpDir, "replica.db")); 1753 1751 replicaIpfs = new IpfsService({ 1754 - blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1755 - datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1752 + db: replicaDb, 1756 1753 networking: false, 1757 1754 }); 1758 1755 await replicaIpfs.start(); ··· 1952 1949 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1953 1950 sourceDb = new Database(join(tmpDir, "source.db")); 1954 1951 sourceIpfs = new IpfsService({ 1955 - blocksPath: join(tmpDir, "source-ipfs-blocks"), 1956 - datastorePath: join(tmpDir, "source-ipfs-datastore"), 1952 + db: sourceDb, 1957 1953 networking: false, 1958 1954 }); 1959 1955 await sourceIpfs.start(); ··· 1967 1963 "0000000000000000000000000000000000000000000000000000000000000002"; 1968 1964 replicaDb = new Database(join(tmpDir, "replica.db")); 1969 1965 replicaIpfs = new IpfsService({ 1970 - blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1971 - datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1966 + db: replicaDb, 1972 1967 networking: false, 1973 1968 }); 1974 1969 await replicaIpfs.start(); ··· 2172 2167 2173 2168 db = new Database(join(tmpDir, "test.db")); 2174 2169 ipfsService = new IpfsService({ 2175 - blocksPath: join(tmpDir, "ipfs-blocks"), 2176 - datastorePath: join(tmpDir, "ipfs-datastore"), 2170 + db, 2177 2171 networking: false, 2178 2172 }); 2179 2173 await ipfsService.start();
+103
src/replication/sync-storage.ts
··· 84 84 ); 85 85 `); 86 86 87 + // Incoming offers table: tracks offers received from other nodes. 88 + this.db.exec(` 89 + CREATE TABLE IF NOT EXISTS incoming_offers ( 90 + offerer_did TEXT NOT NULL, 91 + subject_did TEXT NOT NULL, 92 + offerer_pds_endpoint TEXT, 93 + offerer_endpoint TEXT, 94 + min_copies INTEGER NOT NULL DEFAULT 2, 95 + interval_sec INTEGER NOT NULL DEFAULT 600, 96 + priority INTEGER NOT NULL DEFAULT 50, 97 + received_at TEXT NOT NULL DEFAULT (datetime('now')), 98 + PRIMARY KEY (offerer_did, subject_did) 99 + ); 100 + `); 101 + 87 102 // Offered DIDs table: tracks DIDs we've offered to replicate 88 103 // but don't yet have mutual consent for. 89 104 this.db.exec(` ··· 663 678 const row = this.db 664 679 .prepare("SELECT 1 FROM offered_dids WHERE did = ?") 665 680 .get(did); 681 + return row !== undefined; 682 + } 683 + 684 + // ============================================ 685 + // Incoming offer management 686 + // ============================================ 687 + 688 + /** 689 + * Add or update an incoming offer (idempotent upsert). 690 + */ 691 + addIncomingOffer(offer: { 692 + offererDid: string; 693 + subjectDid: string; 694 + offererPdsEndpoint?: string | null; 695 + offererEndpoint?: string | null; 696 + minCopies?: number; 697 + intervalSec?: number; 698 + priority?: number; 699 + }): void { 700 + this.db 701 + .prepare( 702 + `INSERT INTO incoming_offers (offerer_did, subject_did, offerer_pds_endpoint, offerer_endpoint, min_copies, interval_sec, priority) 703 + VALUES (?, ?, ?, ?, ?, ?, ?) 704 + ON CONFLICT(offerer_did, subject_did) DO UPDATE SET 705 + offerer_pds_endpoint = COALESCE(excluded.offerer_pds_endpoint, incoming_offers.offerer_pds_endpoint), 706 + offerer_endpoint = COALESCE(excluded.offerer_endpoint, incoming_offers.offerer_endpoint), 707 + min_copies = excluded.min_copies, 708 + interval_sec = excluded.interval_sec, 709 + priority = excluded.priority, 710 + received_at = datetime('now')`, 711 + ) 712 + .run( 713 + offer.offererDid, 714 + offer.subjectDid, 715 + offer.offererPdsEndpoint ?? null, 716 + offer.offererEndpoint ?? null, 717 + offer.minCopies ?? 2, 718 + offer.intervalSec ?? 600, 719 + offer.priority ?? 50, 720 + ); 721 + } 722 + 723 + /** 724 + * Get all incoming offers. 725 + */ 726 + getIncomingOffers(): Array<{ 727 + offererDid: string; 728 + subjectDid: string; 729 + offererPdsEndpoint: string | null; 730 + offererEndpoint: string | null; 731 + minCopies: number; 732 + intervalSec: number; 733 + priority: number; 734 + receivedAt: string; 735 + }> { 736 + const rows = this.db 737 + .prepare("SELECT * FROM incoming_offers ORDER BY received_at DESC") 738 + .all() as Array<Record<string, unknown>>; 739 + return rows.map((r) => ({ 740 + offererDid: r.offerer_did as string, 741 + subjectDid: r.subject_did as string, 742 + offererPdsEndpoint: (r.offerer_pds_endpoint as string) ?? null, 743 + offererEndpoint: (r.offerer_endpoint as string) ?? null, 744 + minCopies: r.min_copies as number, 745 + intervalSec: r.interval_sec as number, 746 + priority: r.priority as number, 747 + receivedAt: r.received_at as string, 748 + })); 749 + } 750 + 751 + /** 752 + * Remove an incoming offer. 753 + * Returns true if the offer was actually removed. 754 + */ 755 + removeIncomingOffer(offererDid: string, subjectDid: string): boolean { 756 + const result = this.db 757 + .prepare("DELETE FROM incoming_offers WHERE offerer_did = ? AND subject_did = ?") 758 + .run(offererDid, subjectDid); 759 + return result.changes > 0; 760 + } 761 + 762 + /** 763 + * Check if an incoming offer exists. 764 + */ 765 + hasIncomingOffer(offererDid: string, subjectDid: string): boolean { 766 + const row = this.db 767 + .prepare("SELECT 1 FROM incoming_offers WHERE offerer_did = ? AND subject_did = ?") 768 + .get(offererDid, subjectDid); 666 769 return row !== undefined; 667 770 } 668 771
+1 -1
src/server-startup.test.ts
··· 47 47 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 48 48 RATE_LIMIT_MAX_CONNECTIONS: 100, 49 49 RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 - OAUTH_ENABLED: false, 50 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 51 51 }; 52 52 } 53 53
+1 -1
src/two-node-didless.test.ts
··· 45 45 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 46 46 RATE_LIMIT_MAX_CONNECTIONS: 100, 47 47 RATE_LIMIT_FIREHOSE_PER_IP: 3, 48 - OAUTH_ENABLED: false, 48 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 49 49 }; 50 50 } 51 51
+2 -3
src/xrpc/app-e2e.test.ts
··· 53 53 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 54 54 RATE_LIMIT_MAX_CONNECTIONS: 100, 55 55 RATE_LIMIT_FIREHOSE_PER_IP: 3, 56 - OAUTH_ENABLED: false, 56 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 57 57 }; 58 58 } 59 59 ··· 132 132 ]); 133 133 134 134 ipfsB = new IpfsService({ 135 - blocksPath: join(tmpDir, "b-ipfs-blocks"), 136 - datastorePath: join(tmpDir, "b-ipfs-datastore"), 135 + db: dbB, 137 136 networking: false, 138 137 }); 139 138 await ipfsB.start();
+1 -1
src/xrpc/app.test.ts
··· 39 39 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 40 40 RATE_LIMIT_MAX_CONNECTIONS: 100, 41 41 RATE_LIMIT_FIREHOSE_PER_IP: 3, 42 - OAUTH_ENABLED: false, 42 + OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 43 43 }; 44 44 } 45 45
+288
src/xrpc/app.ts
··· 30 30 let policy: Record<string, unknown> | null = null; 31 31 let verification: { results: unknown[] } = { results: [] }; 32 32 let offeredDids: Array<{ did: string; pdsEndpoint: string | null; offeredAt: string }> = []; 33 + let incomingOffers: Array<Record<string, unknown>> = []; 33 34 34 35 if (replicationManager) { 35 36 const syncStates = replicationManager.getSyncStates(); ··· 40 41 didSources[s.did] = replicationManager.getDidSource(s.did) ?? "unknown"; 41 42 } 42 43 offeredDids = replicationManager.getOfferedDids(); 44 + incomingOffers = replicationManager.getIncomingOffers(); 43 45 replication = { 44 46 enabled: true, 45 47 trackedDids: syncStates.map((s) => s.did), ··· 76 78 policy, 77 79 verification, 78 80 offeredDids, 81 + incomingOffers, 79 82 }); 80 83 } 81 84 ··· 338 341 display: inline-flex; align-items: center; gap: 0.4rem; 339 342 color: #eab308; font-size: 0.8rem; font-weight: 600; 340 343 } 344 + .btn-accept { 345 + padding: 0.2rem 0.5rem; font-family: inherit; font-size: 0.72rem; 346 + border: 1px solid #22c55e; border-radius: 3px; cursor: pointer; 347 + background: var(--card-bg); color: #22c55e; 348 + } 349 + .btn-accept:hover { background: #22c55e; color: #fff; } 350 + .btn-reject { 351 + padding: 0.2rem 0.5rem; font-family: inherit; font-size: 0.72rem; 352 + border: 1px solid #ef4444; border-radius: 3px; cursor: pointer; 353 + background: var(--card-bg); color: #ef4444; 354 + } 355 + .btn-reject:hover { background: #ef4444; color: #fff; } 356 + .incoming-offer-row { 357 + display: flex; align-items: center; gap: 0.5rem; padding: 0.4rem 0; 358 + border-bottom: 1px solid var(--border); font-size: 0.8rem; 359 + } 360 + .incoming-offer-row:last-child { border-bottom: none; } 361 + .incoming-offer-info { flex: 1; min-width: 0; } 362 + .incoming-offer-actions { display: flex; gap: 0.3rem; flex-shrink: 0; } 341 363 .sync-gate-spinner { 342 364 display: inline-block; width: 10px; height: 10px; border: 2px solid #eab30844; 343 365 border-top-color: #eab308; border-radius: 50%; animation: spin 0.8s linear infinite; ··· 381 403 <div id="account-content" class="loading">Loading...</div> 382 404 </section> 383 405 </div> 406 + 407 + <section class="card" id="section-incoming-offers" style="display:none"> 408 + <h2>Incoming Offers</h2> 409 + <div id="incoming-offers-content"></div> 410 + </section> 384 411 385 412 <section class="card" id="section-metrics"> 386 413 <h2>Replication Summary</h2> ··· 978 1005 }); 979 1006 } 980 1007 1008 + function renderIncomingOffers(data) { 1009 + var section = document.getElementById("section-incoming-offers"); 1010 + var el = document.getElementById("incoming-offers-content"); 1011 + var offers = data.incomingOffers || []; 1012 + if (offers.length === 0) { section.style.display = "none"; return; } 1013 + section.style.display = ""; 1014 + var html = ""; 1015 + for (var i = 0; i < offers.length; i++) { 1016 + var o = offers[i]; 1017 + var rowId = "incoming-" + o.offererDid.replace(/[^a-zA-Z0-9]/g, "_"); 1018 + html += '<div class="incoming-offer-row" id="' + rowId + '">' 1019 + + '<div class="incoming-offer-info" id="' + rowId + '-info">' 1020 + + '<span>' + esc(o.offererDid) + '</span>' 1021 + + ' <span style="color:var(--muted)">wants to replicate your data</span>' 1022 + + '</div>' 1023 + + '<div class="incoming-offer-actions">' 1024 + + '<button class="btn-accept" data-did="' + esc(o.offererDid) + '">Accept</button>' 1025 + + '<button class="btn-reject" data-did="' + esc(o.offererDid) + '">Reject</button>' 1026 + + '</div></div>'; 1027 + } 1028 + el.innerHTML = html; 1029 + 1030 + // Async profile resolution for offerer DIDs 1031 + for (var j = 0; j < offers.length; j++) { 1032 + (function(o) { 1033 + var infoId = "incoming-" + o.offererDid.replace(/[^a-zA-Z0-9]/g, "_") + "-info"; 1034 + fetchProfile(o.offererDid).then(function(p) { 1035 + var infoEl = document.getElementById(infoId); 1036 + if (infoEl && p) { 1037 + var av = p.avatar 1038 + ? '<img src="' + esc(p.avatar) + '" alt="" style="width:20px;height:20px;border-radius:50%;vertical-align:middle;margin-right:0.3rem">' 1039 + : ''; 1040 + infoEl.innerHTML = av 1041 + + '<strong>' + esc(p.displayName || p.handle) + '</strong>' 1042 + + ' <span style="color:var(--muted)">@' + esc(p.handle) + '</span>' 1043 + + ' <span style="color:var(--muted)">wants to replicate your data</span>'; 1044 + } 1045 + }); 1046 + })(offers[j]); 1047 + } 1048 + 1049 + // Accept handlers 1050 + el.querySelectorAll(".btn-accept").forEach(function(btn) { 1051 + btn.addEventListener("click", async function() { 1052 + var did = this.dataset.did; 1053 + try { 1054 + var result = await apiPost("org.p2pds.app.acceptOffer", { offererDid: did }); 1055 + if (result.error) { alert("Error: " + (result.message || result.error)); } 1056 + else { refresh(); } 1057 + } catch (err) { alert("Error: " + err.message); } 1058 + }); 1059 + }); 1060 + 1061 + // Reject handlers 1062 + el.querySelectorAll(".btn-reject").forEach(function(btn) { 1063 + btn.addEventListener("click", async function() { 1064 + var did = this.dataset.did; 1065 + if (!confirm("Reject offer from " + did + "?")) return; 1066 + try { 1067 + var result = await apiPost("org.p2pds.app.rejectOffer", { offererDid: did }); 1068 + if (result.error) { alert("Error: " + (result.message || result.error)); } 1069 + else { refresh(); } 1070 + } catch (err) { alert("Error: " + err.message); } 1071 + }); 1072 + }); 1073 + } 1074 + 981 1075 function gateAddDid(overview) { 982 1076 var input = document.getElementById("add-did-input"); 983 1077 var btn = document.getElementById("add-did-btn"); ··· 1026 1120 ]); 1027 1121 await refreshAccount(); 1028 1122 renderOverview(overview); 1123 + renderIncomingOffers(overview); 1029 1124 renderMetrics(overview); 1030 1125 renderReplication(overview); 1031 1126 renderSyncHistory(syncHistory); ··· 1344 1439 explicitDids: policyEngine.getExplicitDids(), 1345 1440 }); 1346 1441 } 1442 + 1443 + /** 1444 + * Handle an incoming offer notification from another node (unauthenticated). 1445 + * Verifies the offer exists in the offerer's repo before storing. 1446 + */ 1447 + export async function notifyOffer( 1448 + c: Context<AppEnv>, 1449 + nodeDid: string, 1450 + replicationManager: ReplicationManager | undefined, 1451 + ): Promise<Response> { 1452 + if (!replicationManager) { 1453 + return c.json( 1454 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 1455 + 400, 1456 + ); 1457 + } 1458 + 1459 + if (!nodeDid) { 1460 + return c.json( 1461 + { error: "NoIdentity", message: "This node has no identity yet" }, 1462 + 400, 1463 + ); 1464 + } 1465 + 1466 + const body = await c.req.json<{ 1467 + offererDid?: string; 1468 + subjectDid?: string; 1469 + offererPdsEndpoint?: string; 1470 + params?: { minCopies?: number; intervalSec?: number; priority?: number }; 1471 + }>().catch(() => ({}) as Record<string, unknown>); 1472 + 1473 + const offererDid = body.offererDid as string | undefined; 1474 + const subjectDid = body.subjectDid as string | undefined; 1475 + 1476 + if (!offererDid || typeof offererDid !== "string") { 1477 + return c.json( 1478 + { error: "MissingParameter", message: "offererDid is required" }, 1479 + 400, 1480 + ); 1481 + } 1482 + 1483 + if (!subjectDid || typeof subjectDid !== "string") { 1484 + return c.json( 1485 + { error: "MissingParameter", message: "subjectDid is required" }, 1486 + 400, 1487 + ); 1488 + } 1489 + 1490 + if (!isValidDid(offererDid) || !isValidDid(subjectDid)) { 1491 + return c.json( 1492 + { error: "InvalidDid", message: "Invalid DID format" }, 1493 + 400, 1494 + ); 1495 + } 1496 + 1497 + // subjectDid must match our node's DID 1498 + if (subjectDid !== nodeDid) { 1499 + return c.json( 1500 + { error: "NotForUs", message: "This offer is not for this node" }, 1501 + 400, 1502 + ); 1503 + } 1504 + 1505 + // Verify the offer exists in offerer's repo (prevents spoofing) 1506 + const peerDiscovery = replicationManager.getPeerDiscovery(); 1507 + const repoFetcher = replicationManager.getRepoFetcher(); 1508 + const offererPdsEndpoint = (body.offererPdsEndpoint as string) ?? await repoFetcher.resolvePds(offererDid); 1509 + 1510 + if (!offererPdsEndpoint) { 1511 + return c.json( 1512 + { error: "CannotVerify", message: "Cannot resolve offerer's PDS endpoint" }, 1513 + 400, 1514 + ); 1515 + } 1516 + 1517 + const offers = await peerDiscovery.discoverOffers(offererDid, offererPdsEndpoint); 1518 + const matchingOffer = offers.find((o) => o.subject === subjectDid); 1519 + 1520 + if (!matchingOffer) { 1521 + return c.json( 1522 + { error: "OfferNotFound", message: "No matching offer found in offerer's repo" }, 1523 + 400, 1524 + ); 1525 + } 1526 + 1527 + // Discover peer info for the endpoint URL 1528 + const peerInfo = await peerDiscovery.discoverPeer(offererDid); 1529 + 1530 + // Store in incoming_offers 1531 + const syncStorage = replicationManager.getSyncStorage(); 1532 + const params = (body.params as Record<string, unknown>) ?? {}; 1533 + syncStorage.addIncomingOffer({ 1534 + offererDid, 1535 + subjectDid, 1536 + offererPdsEndpoint, 1537 + offererEndpoint: peerInfo?.endpoint ?? null, 1538 + minCopies: matchingOffer.minCopies, 1539 + intervalSec: matchingOffer.intervalSec, 1540 + priority: matchingOffer.priority, 1541 + }); 1542 + 1543 + console.log(`[replication] Received offer notification from ${offererDid} for ${subjectDid}`); 1544 + 1545 + return c.json({ status: "received" }); 1546 + } 1547 + 1548 + /** 1549 + * Accept an incoming offer (authenticated). 1550 + */ 1551 + export async function acceptOffer( 1552 + c: Context<AuthedAppEnv>, 1553 + replicationManager: ReplicationManager | undefined, 1554 + ): Promise<Response> { 1555 + if (!replicationManager) { 1556 + return c.json( 1557 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 1558 + 400, 1559 + ); 1560 + } 1561 + 1562 + const body = await c.req.json<{ offererDid?: string }>().catch(() => ({}) as { offererDid?: string }); 1563 + const offererDid = body.offererDid; 1564 + if (!offererDid || typeof offererDid !== "string") { 1565 + return c.json( 1566 + { error: "MissingParameter", message: "offererDid is required" }, 1567 + 400, 1568 + ); 1569 + } 1570 + 1571 + if (!isValidDid(offererDid)) { 1572 + return c.json( 1573 + { error: "InvalidDid", message: "Invalid DID format" }, 1574 + 400, 1575 + ); 1576 + } 1577 + 1578 + const result = await replicationManager.acceptOffer(offererDid); 1579 + if (result.status === "not_found") { 1580 + return c.json( 1581 + { error: "OfferNotFound", message: "No incoming offer from this DID" }, 1582 + 404, 1583 + ); 1584 + } 1585 + if (result.status === "error") { 1586 + return c.json( 1587 + { error: "AcceptFailed", message: result.error }, 1588 + 500, 1589 + ); 1590 + } 1591 + 1592 + return c.json({ status: "accepted", offererDid }); 1593 + } 1594 + 1595 + /** 1596 + * Reject an incoming offer (authenticated). 1597 + */ 1598 + export async function rejectOffer( 1599 + c: Context<AuthedAppEnv>, 1600 + replicationManager: ReplicationManager | undefined, 1601 + ): Promise<Response> { 1602 + if (!replicationManager) { 1603 + return c.json( 1604 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 1605 + 400, 1606 + ); 1607 + } 1608 + 1609 + const body = await c.req.json<{ offererDid?: string }>().catch(() => ({}) as { offererDid?: string }); 1610 + const offererDid = body.offererDid; 1611 + if (!offererDid || typeof offererDid !== "string") { 1612 + return c.json( 1613 + { error: "MissingParameter", message: "offererDid is required" }, 1614 + 400, 1615 + ); 1616 + } 1617 + 1618 + if (!isValidDid(offererDid)) { 1619 + return c.json( 1620 + { error: "InvalidDid", message: "Invalid DID format" }, 1621 + 400, 1622 + ); 1623 + } 1624 + 1625 + const result = replicationManager.rejectOffer(offererDid); 1626 + if (result.status === "not_found") { 1627 + return c.json( 1628 + { error: "OfferNotFound", message: "No incoming offer from this DID" }, 1629 + 404, 1630 + ); 1631 + } 1632 + 1633 + return c.json({ status: "rejected", offererDid }); 1634 + }