/** * End-to-end challenge-response test over libp2p streams. * * Proves two real Helia nodes can exchange MST-proof, block-sample, * and combined challenges over TCP using the Libp2pChallengeTransport. */ import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import Database from "better-sqlite3"; import type { Helia } from "@helia/interface"; import type { Libp2p } from "@libp2p/interface"; import { SqliteBlockstore } from "../../sqlite-blockstore.js"; import { SqliteDatastore } from "../../sqlite-datastore.js"; import { readCarWithRoot } from "@atproto/repo"; import { IpfsService } from "../../ipfs.js"; import { RepoManager } from "../../repo-manager.js"; import type { Config } from "../../config.js"; import { generateChallenge } from "./challenge-generator.js"; import { respondToChallenge } from "./challenge-responder.js"; import { verifyResponse } from "./challenge-verifier.js"; import { Libp2pChallengeTransport, CHALLENGE_PROTOCOL } from "./libp2p-transport.js"; function testConfig(dataDir: string): Config { return { DID: "did:plc:test123", HANDLE: "test.example.com", PDS_HOSTNAME: "test.example.com", AUTH_TOKEN: "test-auth-token", SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 3000, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: [], FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } /** * Create a minimal Helia node with TCP-only networking on localhost. */ async function createTestHeliaNode( db: Database.Database, ): Promise { const { createHelia } = await import("helia"); const { noise } = await import("@chainsafe/libp2p-noise"); const { yamux } = await import("@chainsafe/libp2p-yamux"); const { tcp } = await import("@libp2p/tcp"); const { identify } = await import("@libp2p/identify"); const { bitswap } = await import("@helia/block-brokers"); const { libp2pRouting } = await import("@helia/routers"); const { createLibp2p } = await import("libp2p"); const blockstore = new SqliteBlockstore(db); const datastore = new SqliteDatastore(db); const libp2p = await createLibp2p({ addresses: { listen: ["/ip4/127.0.0.1/tcp/0"], }, transports: [tcp()], connectionEncrypters: [noise()], streamMuxers: [yamux()], services: { identify: identify(), }, }); const helia = await createHelia({ libp2p, blockstore: blockstore as any, datastore: datastore as any, blockBrokers: [bitswap()], routers: [libp2pRouting(libp2p)], }); return helia; } async function waitFor( fn: () => Promise | boolean, timeoutMs: number = 10_000, intervalMs: number = 200, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return; await new Promise((r) => setTimeout(r, intervalMs)); } throw new Error(`waitFor timed out after ${timeoutMs}ms`); } describe("E2E challenge-response over libp2p", () => { let tmpDir: string; let nodeA: Helia | null = null; let nodeB: Helia | null = null; let nodeDbA: Database.Database | null = null; let nodeDbB: Database.Database | null = null; let transportA: Libp2pChallengeTransport | null = null; let transportB: Libp2pChallengeTransport | null = null; let db: InstanceType; let ipfsService: IpfsService; let repoManager: RepoManager; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "e2e-challenge-test-")); const config = testConfig(tmpDir); // Set up repo + local IpfsService (networking=false) for test data db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); repoManager = new RepoManager(db, config); repoManager.init(undefined, ipfsService, ipfsService); // Create test records for (let i = 0; i < 5; i++) { await repoManager.createRecord( "app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: `E2E challenge test post ${i}`, createdAt: new Date().toISOString(), }, ); } }); afterEach(async () => { const stops: Promise[] = []; if (transportA) stops.push(transportA.stop().catch(() => {})); if (transportB) stops.push(transportB.stop().catch(() => {})); if (nodeA) stops.push(nodeA.stop().catch(() => {})); if (nodeB) stops.push(nodeB.stop().catch(() => {})); await Promise.all(stops); transportA = null; transportB = null; nodeA = null; nodeB = null; if (nodeDbA) { nodeDbA.close(); nodeDbA = null; } if (nodeDbB) { nodeDbB.close(); nodeDbB = null; } if (ipfsService.isRunning()) { await ipfsService.stop(); } db.close(); rmSync(tmpDir, { recursive: true, force: true }); }); async function getRepoRootCid(): Promise { const carBytes = await repoManager.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await ipfsService.putBlocks(blocks); return root.toString(); } async function getRecordPaths(): Promise { const records = await repoManager.listRecords("app.bsky.feed.post", { limit: 100, }); return records.records.map((r) => { const rkey = r.uri.split("/").pop()!; return `app.bsky.feed.post/${rkey}`; }); } async function getBlockCids(): Promise { const carBytes = await repoManager.getRepoCar(); const { blocks } = await readCarWithRoot(carBytes); const cids: string[] = []; const internalMap = ( blocks as unknown as { map: Map } ).map; if (internalMap) { for (const cid of internalMap.keys()) { cids.push(cid); } } return cids; } /** * Set up two Helia nodes, connect them, and create transports. * Also copies all repo blocks to node A's blockstore so it can respond to challenges. */ async function setupNodes(): Promise { nodeDbA = new Database(join(tmpDir, "node-a.db")); nodeDbB = new Database(join(tmpDir, "node-b.db")); nodeA = await createTestHeliaNode(nodeDbA); nodeB = await createTestHeliaNode(nodeDbB); // Connect B -> A await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); await waitFor( () => nodeA!.libp2p.getConnections().length > 0 && nodeB!.libp2p.getConnections().length > 0, 5_000, ); // Copy all repo blocks to node A's blockstore const carBytes = await repoManager.getRepoCar(); const { blocks } = await readCarWithRoot(carBytes); const internalMap = ( blocks as unknown as { map: Map } ).map; if (internalMap) { const { CID } = await import("multiformats"); for (const [cidStr, bytes] of internalMap) { const cid = CID.parse(cidStr); await nodeA!.blockstore.put(cid, bytes); } } transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p); transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p); } it("MST proof challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { await setupNodes(); const rootCid = await getRepoRootCid(); const recordPaths = await getRecordPaths(); // Node A handles challenges using its local blockstore (wrapped as BlockStore) const nodeABlockStore: import("../../ipfs.js").BlockStore = { async putBlock(cidStr: string, bytes: Uint8Array) { const { CID } = await import("multiformats"); await nodeA!.blockstore.put(CID.parse(cidStr), bytes); }, async getBlock(cidStr: string) { try { const { CID } = await import("multiformats"); const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); // Collect async generator const chunks: Uint8Array[] = []; for await (const chunk of bytes) { chunks.push(chunk); } if (chunks.length === 0) return null; if (chunks.length === 1) return chunks[0]!; const total = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(total); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } catch { return null; } }, async hasBlock(cidStr: string) { try { const { CID } = await import("multiformats"); return await nodeA!.blockstore.has(CID.parse(cidStr)); } catch { return false; } }, async putBlocks() {}, async deleteBlock() {}, }; // Register challenge handler on node A transportA!.onChallenge(async (challenge) => { return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); }); // Generate challenge on node B const challenge = generateChallenge({ challengerDid: "did:plc:verifier", targetDid: "did:plc:prover", subjectDid: "did:plc:test123", commitCid: rootCid, availableRecordPaths: recordPaths, challengeType: "mst-proof", epoch: 1, nonce: "e2e-test-nonce", config: { recordCount: 2 }, }); // Send challenge from B to A over libp2p const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); const response = await transportB!.sendChallenge(addrA, challenge); expect(response.challengeId).toBe(challenge.id); expect(response.mstProofs).toBeDefined(); expect(response.mstProofs!.length).toBe(challenge.recordPaths.length); // Verify the response const result = await verifyResponse(challenge, response, ipfsService); expect(result.passed).toBe(true); expect(result.mstResults).toBeDefined(); expect(result.mstResults!.every((r) => r.valid)).toBe(true); }); it("block-sample challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { await setupNodes(); const rootCid = await getRepoRootCid(); const blockCids = await getBlockCids(); // Node A handles challenges const nodeABlockStore: import("../../ipfs.js").BlockStore = { async putBlock(cidStr: string, bytes: Uint8Array) { const { CID } = await import("multiformats"); await nodeA!.blockstore.put(CID.parse(cidStr), bytes); }, async getBlock(cidStr: string) { try { const { CID } = await import("multiformats"); const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); const chunks: Uint8Array[] = []; for await (const chunk of bytes) { chunks.push(chunk); } if (chunks.length === 0) return null; if (chunks.length === 1) return chunks[0]!; const total = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(total); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } catch { return null; } }, async hasBlock(cidStr: string) { try { const { CID } = await import("multiformats"); return await nodeA!.blockstore.has(CID.parse(cidStr)); } catch { return false; } }, async putBlocks() {}, async deleteBlock() {}, }; transportA!.onChallenge(async (challenge) => { return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); }); const challenge = generateChallenge({ challengerDid: "did:plc:verifier", targetDid: "did:plc:prover", subjectDid: "did:plc:test123", commitCid: rootCid, availableRecordPaths: [], availableBlockCids: blockCids, challengeType: "block-sample", epoch: 1, nonce: "e2e-block-nonce", config: { blockSampleSize: 3 }, }); const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); const response = await transportB!.sendChallenge(addrA, challenge); expect(response.blockResults).toBeDefined(); expect(response.blockResults!.every((r) => r.available)).toBe(true); const result = await verifyResponse(challenge, response, ipfsService); expect(result.passed).toBe(true); expect(result.blockResults).toBeDefined(); expect(result.blockResults!.every((r) => r.available && r.prefixValid)).toBe(true); }); it("combined challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { await setupNodes(); const rootCid = await getRepoRootCid(); const recordPaths = await getRecordPaths(); const blockCids = await getBlockCids(); const nodeABlockStore: import("../../ipfs.js").BlockStore = { async putBlock(cidStr: string, bytes: Uint8Array) { const { CID } = await import("multiformats"); await nodeA!.blockstore.put(CID.parse(cidStr), bytes); }, async getBlock(cidStr: string) { try { const { CID } = await import("multiformats"); const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); const chunks: Uint8Array[] = []; for await (const chunk of bytes) { chunks.push(chunk); } if (chunks.length === 0) return null; if (chunks.length === 1) return chunks[0]!; const total = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(total); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } catch { return null; } }, async hasBlock(cidStr: string) { try { const { CID } = await import("multiformats"); return await nodeA!.blockstore.has(CID.parse(cidStr)); } catch { return false; } }, async putBlocks() {}, async deleteBlock() {}, }; transportA!.onChallenge(async (challenge) => { return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); }); const challenge = generateChallenge({ challengerDid: "did:plc:verifier", targetDid: "did:plc:prover", subjectDid: "did:plc:test123", commitCid: rootCid, availableRecordPaths: recordPaths, availableBlockCids: blockCids, challengeType: "combined", epoch: 1, nonce: "e2e-combined-nonce", config: { recordCount: 2, blockSampleSize: 2 }, }); const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); const response = await transportB!.sendChallenge(addrA, challenge); expect(response.mstProofs).toBeDefined(); expect(response.blockResults).toBeDefined(); const result = await verifyResponse(challenge, response, ipfsService); expect(result.passed).toBe(true); expect(result.mstResults).toBeDefined(); expect(result.blockResults).toBeDefined(); }); it("handler lifecycle: onChallenge registers, stop() unregisters", { timeout: 60_000 }, async () => { await setupNodes(); // Register handler transportA!.onChallenge(async (challenge) => { return { challengeId: challenge.id, responderDid: "did:plc:prover", respondedAt: new Date().toISOString(), }; }); // Verify protocol is registered by checking that dialProtocol works const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); const ma = (await import("@multiformats/multiaddr")).multiaddr(addrA); const stream = await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); stream.abort(new Error("test complete")); // Unregister await transportA!.stop(); // After stop, dialProtocol should fail try { await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); // If we get here, the protocol was not unregistered expect.fail("Expected dialProtocol to fail after stop()"); } catch (err) { // Expected: protocol not supported after unhandle expect(err).toBeDefined(); } }); });