atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 504 lines 16 kB view raw
1/** 2 * End-to-end challenge-response test over libp2p streams. 3 * 4 * Proves two real Helia nodes can exchange MST-proof, block-sample, 5 * and combined challenges over TCP using the Libp2pChallengeTransport. 6 */ 7 8import { describe, it, expect, beforeEach, afterEach } from "vitest"; 9import { mkdtempSync, rmSync } from "node:fs"; 10import { tmpdir } from "node:os"; 11import { join } from "node:path"; 12import Database from "better-sqlite3"; 13import type { Helia } from "@helia/interface"; 14import type { Libp2p } from "@libp2p/interface"; 15import { SqliteBlockstore } from "../../sqlite-blockstore.js"; 16import { SqliteDatastore } from "../../sqlite-datastore.js"; 17import { readCarWithRoot } from "@atproto/repo"; 18 19import { IpfsService } from "../../ipfs.js"; 20import { RepoManager } from "../../repo-manager.js"; 21import type { Config } from "../../config.js"; 22import { generateChallenge } from "./challenge-generator.js"; 23import { respondToChallenge } from "./challenge-responder.js"; 24import { verifyResponse } from "./challenge-verifier.js"; 25import { Libp2pChallengeTransport, CHALLENGE_PROTOCOL } from "./libp2p-transport.js"; 26 27function testConfig(dataDir: string): Config { 28 return { 29 DID: "did:plc:test123", 30 HANDLE: "test.example.com", 31 PDS_HOSTNAME: "test.example.com", 32 AUTH_TOKEN: "test-auth-token", 33 SIGNING_KEY: 34 "0000000000000000000000000000000000000000000000000000000000000001", 35 SIGNING_KEY_PUBLIC: 36 "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 37 JWT_SECRET: "test-jwt-secret", 38 PASSWORD_HASH: "$2a$10$test", 39 DATA_DIR: dataDir, 40 PORT: 3000, 41 IPFS_ENABLED: true, 42 IPFS_NETWORKING: false, 43 REPLICATE_DIDS: [], 44 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 45 FIREHOSE_ENABLED: false, 46 RATE_LIMIT_ENABLED: false, 47 RATE_LIMIT_READ_PER_MIN: 300, 48 RATE_LIMIT_SYNC_PER_MIN: 30, 49 RATE_LIMIT_SESSION_PER_MIN: 10, 50 RATE_LIMIT_WRITE_PER_MIN: 200, 51 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 52 RATE_LIMIT_MAX_CONNECTIONS: 100, 53 RATE_LIMIT_FIREHOSE_PER_IP: 3, 54 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 55 }; 56} 57 58/** 59 * Create a minimal Helia node with TCP-only networking on localhost. 60 */ 61async function createTestHeliaNode( 62 db: Database.Database, 63): Promise<Helia> { 64 const { createHelia } = await import("helia"); 65 const { noise } = await import("@chainsafe/libp2p-noise"); 66 const { yamux } = await import("@chainsafe/libp2p-yamux"); 67 const { tcp } = await import("@libp2p/tcp"); 68 const { identify } = await import("@libp2p/identify"); 69 const { bitswap } = await import("@helia/block-brokers"); 70 const { libp2pRouting } = await import("@helia/routers"); 71 const { createLibp2p } = await import("libp2p"); 72 73 const blockstore = new SqliteBlockstore(db); 74 const datastore = new SqliteDatastore(db); 75 76 const libp2p = await createLibp2p({ 77 addresses: { 78 listen: ["/ip4/127.0.0.1/tcp/0"], 79 }, 80 transports: [tcp()], 81 connectionEncrypters: [noise()], 82 streamMuxers: [yamux()], 83 services: { 84 identify: identify(), 85 }, 86 }); 87 88 const helia = await createHelia({ 89 libp2p, 90 blockstore: blockstore as any, 91 datastore: datastore as any, 92 blockBrokers: [bitswap()], 93 routers: [libp2pRouting(libp2p)], 94 }); 95 96 return helia; 97} 98 99async function waitFor( 100 fn: () => Promise<boolean> | boolean, 101 timeoutMs: number = 10_000, 102 intervalMs: number = 200, 103): Promise<void> { 104 const deadline = Date.now() + timeoutMs; 105 while (Date.now() < deadline) { 106 if (await fn()) return; 107 await new Promise((r) => setTimeout(r, intervalMs)); 108 } 109 throw new Error(`waitFor timed out after ${timeoutMs}ms`); 110} 111 112describe("E2E challenge-response over libp2p", () => { 113 let tmpDir: string; 114 let nodeA: Helia | null = null; 115 let nodeB: Helia | null = null; 116 let nodeDbA: Database.Database | null = null; 117 let nodeDbB: Database.Database | null = null; 118 let transportA: Libp2pChallengeTransport | null = null; 119 let transportB: Libp2pChallengeTransport | null = null; 120 let db: InstanceType<typeof Database>; 121 let ipfsService: IpfsService; 122 let repoManager: RepoManager; 123 124 beforeEach(async () => { 125 tmpDir = mkdtempSync(join(tmpdir(), "e2e-challenge-test-")); 126 const config = testConfig(tmpDir); 127 128 // Set up repo + local IpfsService (networking=false) for test data 129 db = new Database(join(tmpDir, "test.db")); 130 ipfsService = new IpfsService({ 131 db, 132 networking: false, 133 }); 134 await ipfsService.start(); 135 repoManager = new RepoManager(db, config); 136 repoManager.init(undefined, ipfsService, ipfsService); 137 138 // Create test records 139 for (let i = 0; i < 5; i++) { 140 await repoManager.createRecord( 141 "app.bsky.feed.post", 142 undefined, 143 { 144 $type: "app.bsky.feed.post", 145 text: `E2E challenge test post ${i}`, 146 createdAt: new Date().toISOString(), 147 }, 148 ); 149 } 150 }); 151 152 afterEach(async () => { 153 const stops: Promise<void>[] = []; 154 if (transportA) stops.push(transportA.stop().catch(() => {})); 155 if (transportB) stops.push(transportB.stop().catch(() => {})); 156 if (nodeA) stops.push(nodeA.stop().catch(() => {})); 157 if (nodeB) stops.push(nodeB.stop().catch(() => {})); 158 await Promise.all(stops); 159 transportA = null; 160 transportB = null; 161 nodeA = null; 162 nodeB = null; 163 164 if (nodeDbA) { nodeDbA.close(); nodeDbA = null; } 165 if (nodeDbB) { nodeDbB.close(); nodeDbB = null; } 166 167 if (ipfsService.isRunning()) { 168 await ipfsService.stop(); 169 } 170 db.close(); 171 rmSync(tmpDir, { recursive: true, force: true }); 172 }); 173 174 async function getRepoRootCid(): Promise<string> { 175 const carBytes = await repoManager.getRepoCar(); 176 const { root, blocks } = await readCarWithRoot(carBytes); 177 await ipfsService.putBlocks(blocks); 178 return root.toString(); 179 } 180 181 async function getRecordPaths(): Promise<string[]> { 182 const records = await repoManager.listRecords("app.bsky.feed.post", { 183 limit: 100, 184 }); 185 return records.records.map((r) => { 186 const rkey = r.uri.split("/").pop()!; 187 return `app.bsky.feed.post/${rkey}`; 188 }); 189 } 190 191 async function getBlockCids(): Promise<string[]> { 192 const carBytes = await repoManager.getRepoCar(); 193 const { blocks } = await readCarWithRoot(carBytes); 194 const cids: string[] = []; 195 const internalMap = ( 196 blocks as unknown as { map: Map<string, Uint8Array> } 197 ).map; 198 if (internalMap) { 199 for (const cid of internalMap.keys()) { 200 cids.push(cid); 201 } 202 } 203 return cids; 204 } 205 206 /** 207 * Set up two Helia nodes, connect them, and create transports. 208 * Also copies all repo blocks to node A's blockstore so it can respond to challenges. 209 */ 210 async function setupNodes(): Promise<void> { 211 nodeDbA = new Database(join(tmpDir, "node-a.db")); 212 nodeDbB = new Database(join(tmpDir, "node-b.db")); 213 nodeA = await createTestHeliaNode(nodeDbA); 214 nodeB = await createTestHeliaNode(nodeDbB); 215 216 // Connect B -> A 217 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); 218 await waitFor( 219 () => 220 nodeA!.libp2p.getConnections().length > 0 && 221 nodeB!.libp2p.getConnections().length > 0, 222 5_000, 223 ); 224 225 // Copy all repo blocks to node A's blockstore 226 const carBytes = await repoManager.getRepoCar(); 227 const { blocks } = await readCarWithRoot(carBytes); 228 const internalMap = ( 229 blocks as unknown as { map: Map<string, Uint8Array> } 230 ).map; 231 if (internalMap) { 232 const { CID } = await import("multiformats"); 233 for (const [cidStr, bytes] of internalMap) { 234 const cid = CID.parse(cidStr); 235 await nodeA!.blockstore.put(cid, bytes); 236 } 237 } 238 239 transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p); 240 transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p); 241 } 242 243 it("MST proof challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { 244 await setupNodes(); 245 246 const rootCid = await getRepoRootCid(); 247 const recordPaths = await getRecordPaths(); 248 249 // Node A handles challenges using its local blockstore (wrapped as BlockStore) 250 const nodeABlockStore: import("../../ipfs.js").BlockStore = { 251 async putBlock(cidStr: string, bytes: Uint8Array) { 252 const { CID } = await import("multiformats"); 253 await nodeA!.blockstore.put(CID.parse(cidStr), bytes); 254 }, 255 async getBlock(cidStr: string) { 256 try { 257 const { CID } = await import("multiformats"); 258 const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); 259 // Collect async generator 260 const chunks: Uint8Array[] = []; 261 for await (const chunk of bytes) { 262 chunks.push(chunk); 263 } 264 if (chunks.length === 0) return null; 265 if (chunks.length === 1) return chunks[0]!; 266 const total = chunks.reduce((acc, c) => acc + c.length, 0); 267 const result = new Uint8Array(total); 268 let offset = 0; 269 for (const c of chunks) { 270 result.set(c, offset); 271 offset += c.length; 272 } 273 return result; 274 } catch { 275 return null; 276 } 277 }, 278 async hasBlock(cidStr: string) { 279 try { 280 const { CID } = await import("multiformats"); 281 return await nodeA!.blockstore.has(CID.parse(cidStr)); 282 } catch { 283 return false; 284 } 285 }, 286 async putBlocks() {}, 287 async deleteBlock() {}, 288 }; 289 290 // Register challenge handler on node A 291 transportA!.onChallenge(async (challenge) => { 292 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 293 }); 294 295 // Generate challenge on node B 296 const challenge = generateChallenge({ 297 challengerDid: "did:plc:verifier", 298 targetDid: "did:plc:prover", 299 subjectDid: "did:plc:test123", 300 commitCid: rootCid, 301 availableRecordPaths: recordPaths, 302 challengeType: "mst-proof", 303 epoch: 1, 304 nonce: "e2e-test-nonce", 305 config: { recordCount: 2 }, 306 }); 307 308 // Send challenge from B to A over libp2p 309 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 310 const response = await transportB!.sendChallenge(addrA, challenge); 311 312 expect(response.challengeId).toBe(challenge.id); 313 expect(response.mstProofs).toBeDefined(); 314 expect(response.mstProofs!.length).toBe(challenge.recordPaths.length); 315 316 // Verify the response 317 const result = await verifyResponse(challenge, response, ipfsService); 318 expect(result.passed).toBe(true); 319 expect(result.mstResults).toBeDefined(); 320 expect(result.mstResults!.every((r) => r.valid)).toBe(true); 321 }); 322 323 it("block-sample challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { 324 await setupNodes(); 325 326 const rootCid = await getRepoRootCid(); 327 const blockCids = await getBlockCids(); 328 329 // Node A handles challenges 330 const nodeABlockStore: import("../../ipfs.js").BlockStore = { 331 async putBlock(cidStr: string, bytes: Uint8Array) { 332 const { CID } = await import("multiformats"); 333 await nodeA!.blockstore.put(CID.parse(cidStr), bytes); 334 }, 335 async getBlock(cidStr: string) { 336 try { 337 const { CID } = await import("multiformats"); 338 const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); 339 const chunks: Uint8Array[] = []; 340 for await (const chunk of bytes) { 341 chunks.push(chunk); 342 } 343 if (chunks.length === 0) return null; 344 if (chunks.length === 1) return chunks[0]!; 345 const total = chunks.reduce((acc, c) => acc + c.length, 0); 346 const result = new Uint8Array(total); 347 let offset = 0; 348 for (const c of chunks) { 349 result.set(c, offset); 350 offset += c.length; 351 } 352 return result; 353 } catch { 354 return null; 355 } 356 }, 357 async hasBlock(cidStr: string) { 358 try { 359 const { CID } = await import("multiformats"); 360 return await nodeA!.blockstore.has(CID.parse(cidStr)); 361 } catch { 362 return false; 363 } 364 }, 365 async putBlocks() {}, 366 async deleteBlock() {}, 367 }; 368 369 transportA!.onChallenge(async (challenge) => { 370 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 371 }); 372 373 const challenge = generateChallenge({ 374 challengerDid: "did:plc:verifier", 375 targetDid: "did:plc:prover", 376 subjectDid: "did:plc:test123", 377 commitCid: rootCid, 378 availableRecordPaths: [], 379 availableBlockCids: blockCids, 380 challengeType: "block-sample", 381 epoch: 1, 382 nonce: "e2e-block-nonce", 383 config: { blockSampleSize: 3 }, 384 }); 385 386 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 387 const response = await transportB!.sendChallenge(addrA, challenge); 388 389 expect(response.blockResults).toBeDefined(); 390 expect(response.blockResults!.every((r) => r.available)).toBe(true); 391 392 const result = await verifyResponse(challenge, response, ipfsService); 393 expect(result.passed).toBe(true); 394 expect(result.blockResults).toBeDefined(); 395 expect(result.blockResults!.every((r) => r.available && r.prefixValid)).toBe(true); 396 }); 397 398 it("combined challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { 399 await setupNodes(); 400 401 const rootCid = await getRepoRootCid(); 402 const recordPaths = await getRecordPaths(); 403 const blockCids = await getBlockCids(); 404 405 const nodeABlockStore: import("../../ipfs.js").BlockStore = { 406 async putBlock(cidStr: string, bytes: Uint8Array) { 407 const { CID } = await import("multiformats"); 408 await nodeA!.blockstore.put(CID.parse(cidStr), bytes); 409 }, 410 async getBlock(cidStr: string) { 411 try { 412 const { CID } = await import("multiformats"); 413 const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); 414 const chunks: Uint8Array[] = []; 415 for await (const chunk of bytes) { 416 chunks.push(chunk); 417 } 418 if (chunks.length === 0) return null; 419 if (chunks.length === 1) return chunks[0]!; 420 const total = chunks.reduce((acc, c) => acc + c.length, 0); 421 const result = new Uint8Array(total); 422 let offset = 0; 423 for (const c of chunks) { 424 result.set(c, offset); 425 offset += c.length; 426 } 427 return result; 428 } catch { 429 return null; 430 } 431 }, 432 async hasBlock(cidStr: string) { 433 try { 434 const { CID } = await import("multiformats"); 435 return await nodeA!.blockstore.has(CID.parse(cidStr)); 436 } catch { 437 return false; 438 } 439 }, 440 async putBlocks() {}, 441 async deleteBlock() {}, 442 }; 443 444 transportA!.onChallenge(async (challenge) => { 445 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 446 }); 447 448 const challenge = generateChallenge({ 449 challengerDid: "did:plc:verifier", 450 targetDid: "did:plc:prover", 451 subjectDid: "did:plc:test123", 452 commitCid: rootCid, 453 availableRecordPaths: recordPaths, 454 availableBlockCids: blockCids, 455 challengeType: "combined", 456 epoch: 1, 457 nonce: "e2e-combined-nonce", 458 config: { recordCount: 2, blockSampleSize: 2 }, 459 }); 460 461 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 462 const response = await transportB!.sendChallenge(addrA, challenge); 463 464 expect(response.mstProofs).toBeDefined(); 465 expect(response.blockResults).toBeDefined(); 466 467 const result = await verifyResponse(challenge, response, ipfsService); 468 expect(result.passed).toBe(true); 469 expect(result.mstResults).toBeDefined(); 470 expect(result.blockResults).toBeDefined(); 471 }); 472 473 it("handler lifecycle: onChallenge registers, stop() unregisters", { timeout: 60_000 }, async () => { 474 await setupNodes(); 475 476 // Register handler 477 transportA!.onChallenge(async (challenge) => { 478 return { 479 challengeId: challenge.id, 480 responderDid: "did:plc:prover", 481 respondedAt: new Date().toISOString(), 482 }; 483 }); 484 485 // Verify protocol is registered by checking that dialProtocol works 486 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 487 const ma = (await import("@multiformats/multiaddr")).multiaddr(addrA); 488 const stream = await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); 489 stream.abort(new Error("test complete")); 490 491 // Unregister 492 await transportA!.stop(); 493 494 // After stop, dialProtocol should fail 495 try { 496 await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); 497 // If we get here, the protocol was not unregistered 498 expect.fail("Expected dialProtocol to fail after stop()"); 499 } catch (err) { 500 // Expected: protocol not supported after unhandle 501 expect(err).toBeDefined(); 502 } 503 }); 504});