atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add multi-node integration test for gossipsub + challenge + failover

Proves gossipsub pub/sub and challenge-response streams coexist on the
same libp2p instance pair, and that FailoverChallengeTransport correctly
resolves HTTP endpoints to multiaddrs via SyncStorage.

+463
+463
src/replication/e2e-multi-node.test.ts
··· 1 + /** 2 + * Multi-node integration test: gossipsub + challenge-response + failover. 3 + * 4 + * Proves that gossipsub pub/sub and custom protocol streams (challenge-response) 5 + * coexist on the same libp2p instance pair with real networking, and that the 6 + * failover transport correctly resolves HTTP endpoints to multiaddrs via SyncStorage. 7 + */ 8 + 9 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 10 + import { mkdtempSync, rmSync } from "node:fs"; 11 + import { tmpdir } from "node:os"; 12 + import { join } from "node:path"; 13 + import Database from "better-sqlite3"; 14 + import type { Helia } from "@helia/interface"; 15 + import type { Libp2p } from "@libp2p/interface"; 16 + import { readCarWithRoot } from "@atproto/repo"; 17 + 18 + import { IpfsService, commitTopic, type CommitNotification } from "../ipfs.js"; 19 + import type { BlockStore } from "../ipfs.js"; 20 + import { RepoManager } from "../repo-manager.js"; 21 + import type { Config } from "../config.js"; 22 + import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js"; 23 + import { generateChallenge } from "./challenge-response/challenge-generator.js"; 24 + import { respondToChallenge } from "./challenge-response/challenge-responder.js"; 25 + import { verifyResponse } from "./challenge-response/challenge-verifier.js"; 26 + import { Libp2pChallengeTransport } from "./challenge-response/libp2p-transport.js"; 27 + import { FailoverChallengeTransport } from "./challenge-response/failover-transport.js"; 28 + import type { ChallengeTransport } from "./challenge-response/transport.js"; 29 + import type { StorageChallenge, StorageChallengeResponse } from "./challenge-response/types.js"; 30 + import { SyncStorage } from "./sync-storage.js"; 31 + 32 + function testConfig(dataDir: string): Config { 33 + return { 34 + DID: "did:plc:test123", 35 + HANDLE: "test.example.com", 36 + PDS_HOSTNAME: "test.example.com", 37 + AUTH_TOKEN: "test-auth-token", 38 + SIGNING_KEY: 39 + "0000000000000000000000000000000000000000000000000000000000000001", 40 + SIGNING_KEY_PUBLIC: 41 + "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 42 + JWT_SECRET: "test-jwt-secret", 43 + PASSWORD_HASH: "$2a$10$test", 44 + DATA_DIR: dataDir, 45 + PORT: 3000, 46 + IPFS_ENABLED: true, 47 + IPFS_NETWORKING: false, 48 + REPLICATE_DIDS: [], 49 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 50 + FIREHOSE_ENABLED: false, 51 + }; 52 + } 53 + 54 + /** 55 + * Create a minimal Helia node with TCP + gossipsub + identify. 56 + * Supports both gossipsub pub/sub and custom protocol streams. 57 + */ 58 + async function createGossipsubChallengeNode( 59 + blocksPath: string, 60 + datastorePath: string, 61 + ): Promise<Helia> { 62 + const { createHelia } = await import("helia"); 63 + const { noise } = await import("@chainsafe/libp2p-noise"); 64 + const { yamux } = await import("@chainsafe/libp2p-yamux"); 65 + const { tcp } = await import("@libp2p/tcp"); 66 + const { identify } = await import("@libp2p/identify"); 67 + const { gossipsub } = await import("@libp2p/gossipsub"); 68 + const { createLibp2p } = await import("libp2p"); 69 + const { FsBlockstore } = await import("blockstore-fs"); 70 + const { FsDatastore } = await import("datastore-fs"); 71 + 72 + const blockstore = new FsBlockstore(blocksPath); 73 + const datastore = new FsDatastore(datastorePath); 74 + 75 + const libp2p = await createLibp2p({ 76 + addresses: { 77 + listen: ["/ip4/127.0.0.1/tcp/0"], 78 + }, 79 + transports: [tcp()], 80 + connectionEncrypters: [noise()], 81 + streamMuxers: [yamux()], 82 + services: { 83 + identify: identify(), 84 + pubsub: gossipsub({ 85 + emitSelf: false, 86 + allowPublishToZeroTopicPeers: true, 87 + }), 88 + }, 89 + }); 90 + 91 + return createHelia({ 92 + libp2p, 93 + blockstore, 94 + datastore, 95 + }); 96 + } 97 + 98 + async function waitFor( 99 + fn: () => Promise<boolean> | boolean, 100 + timeoutMs: number = 10_000, 101 + intervalMs: number = 200, 102 + ): Promise<void> { 103 + const deadline = Date.now() + timeoutMs; 104 + while (Date.now() < deadline) { 105 + if (await fn()) return; 106 + await new Promise((r) => setTimeout(r, intervalMs)); 107 + } 108 + throw new Error(`waitFor timed out after ${timeoutMs}ms`); 109 + } 110 + 111 + /** 112 + * Wrap a Helia blockstore as the BlockStore interface used by challenge-response. 113 + */ 114 + function makeBlockStoreAdapter(helia: Helia): BlockStore { 115 + return { 116 + async putBlock(cidStr: string, bytes: Uint8Array) { 117 + const { CID } = await import("multiformats"); 118 + await helia.blockstore.put(CID.parse(cidStr), bytes); 119 + }, 120 + async getBlock(cidStr: string) { 121 + try { 122 + const { CID } = await import("multiformats"); 123 + const bytes = await helia.blockstore.get(CID.parse(cidStr), { offline: true } as any); 124 + const chunks: Uint8Array[] = []; 125 + for await (const chunk of bytes) { 126 + chunks.push(chunk); 127 + } 128 + if (chunks.length === 0) return null; 129 + if (chunks.length === 1) return chunks[0]!; 130 + const total = chunks.reduce((acc, c) => acc + c.length, 0); 131 + const result = new Uint8Array(total); 132 + let offset = 0; 133 + for (const c of chunks) { 134 + result.set(c, offset); 135 + offset += c.length; 136 + } 137 + return result; 138 + } catch { 139 + return null; 140 + } 141 + }, 142 + async hasBlock(cidStr: string) { 143 + try { 144 + const { CID } = await import("multiformats"); 145 + return await helia.blockstore.has(CID.parse(cidStr)); 146 + } catch { 147 + return false; 148 + } 149 + }, 150 + async putBlocks() {}, 151 + }; 152 + } 153 + 154 + function getPubsub(node: Helia) { 155 + return (node.libp2p.services as Record<string, unknown>).pubsub as { 156 + subscribe(topic: string): void; 157 + publish(topic: string, data: Uint8Array): Promise<unknown>; 158 + addEventListener(event: string, handler: (evt: unknown) => void): void; 159 + }; 160 + } 161 + 162 + describe("E2E multi-node: gossipsub + challenge + failover", () => { 163 + let tmpDir: string; 164 + let db: InstanceType<typeof Database>; 165 + let ipfsService: IpfsService; 166 + let repoManager: RepoManager; 167 + let nodeA: Helia | null = null; 168 + let nodeB: Helia | null = null; 169 + let transportA: Libp2pChallengeTransport | null = null; 170 + let transportB: Libp2pChallengeTransport | null = null; 171 + 172 + beforeEach(async () => { 173 + tmpDir = mkdtempSync(join(tmpdir(), "e2e-multi-node-test-")); 174 + const config = testConfig(tmpDir); 175 + 176 + db = new Database(join(tmpDir, "test.db")); 177 + ipfsService = new IpfsService({ 178 + blocksPath: join(tmpDir, "ipfs-blocks"), 179 + datastorePath: join(tmpDir, "ipfs-datastore"), 180 + networking: false, 181 + }); 182 + await ipfsService.start(); 183 + repoManager = new RepoManager(db, config); 184 + repoManager.init(undefined, ipfsService, ipfsService); 185 + 186 + // Create 5 test records 187 + for (let i = 0; i < 5; i++) { 188 + await repoManager.createRecord( 189 + "app.bsky.feed.post", 190 + undefined, 191 + { 192 + $type: "app.bsky.feed.post", 193 + text: `E2E multi-node test post ${i}`, 194 + createdAt: new Date().toISOString(), 195 + }, 196 + ); 197 + } 198 + }); 199 + 200 + afterEach(async () => { 201 + const stops: Promise<void>[] = []; 202 + if (transportA) stops.push(transportA.stop().catch(() => {})); 203 + if (transportB) stops.push(transportB.stop().catch(() => {})); 204 + if (nodeA) stops.push(nodeA.stop().catch(() => {})); 205 + if (nodeB) stops.push(nodeB.stop().catch(() => {})); 206 + await Promise.all(stops); 207 + transportA = null; 208 + transportB = null; 209 + nodeA = null; 210 + nodeB = null; 211 + 212 + if (ipfsService.isRunning()) { 213 + await ipfsService.stop(); 214 + } 215 + db.close(); 216 + rmSync(tmpDir, { recursive: true, force: true }); 217 + }); 218 + 219 + async function getRepoRootCid(): Promise<string> { 220 + const carBytes = await repoManager.getRepoCar(); 221 + const { root, blocks } = await readCarWithRoot(carBytes); 222 + await ipfsService.putBlocks(blocks); 223 + return root.toString(); 224 + } 225 + 226 + async function getRecordPaths(): Promise<string[]> { 227 + const records = await repoManager.listRecords("app.bsky.feed.post", { 228 + limit: 100, 229 + }); 230 + return records.records.map((r) => { 231 + const rkey = r.uri.split("/").pop()!; 232 + return `app.bsky.feed.post/${rkey}`; 233 + }); 234 + } 235 + 236 + /** 237 + * Create two gossipsub+challenge nodes, connect them, copy repo blocks to Node A, 238 + * and create Libp2pChallengeTransport on both. 239 + */ 240 + async function setupConnectedNodes(): Promise<void> { 241 + nodeA = await createGossipsubChallengeNode( 242 + join(tmpDir, "a-blocks"), 243 + join(tmpDir, "a-datastore"), 244 + ); 245 + nodeB = await createGossipsubChallengeNode( 246 + join(tmpDir, "b-blocks"), 247 + join(tmpDir, "b-datastore"), 248 + ); 249 + 250 + // Connect B -> A 251 + await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); 252 + await waitFor( 253 + () => 254 + nodeA!.libp2p.getConnections().length > 0 && 255 + nodeB!.libp2p.getConnections().length > 0, 256 + 5_000, 257 + ); 258 + 259 + // Copy all repo blocks to node A's blockstore 260 + const carBytes = await repoManager.getRepoCar(); 261 + const { blocks } = await readCarWithRoot(carBytes); 262 + const internalMap = ( 263 + blocks as unknown as { map: Map<string, Uint8Array> } 264 + ).map; 265 + if (internalMap) { 266 + const { CID } = await import("multiformats"); 267 + for (const [cidStr, bytes] of internalMap) { 268 + const cid = CID.parse(cidStr); 269 + await nodeA!.blockstore.put(cid, bytes); 270 + } 271 + } 272 + 273 + transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p); 274 + transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p); 275 + } 276 + 277 + it( 278 + "gossipsub notification + challenge roundtrip on same node pair", 279 + { timeout: 60_000 }, 280 + async () => { 281 + await setupConnectedNodes(); 282 + 283 + const rootCid = await getRepoRootCid(); 284 + const recordPaths = await getRecordPaths(); 285 + const testDid = "did:plc:test123"; 286 + const topic = commitTopic(testDid); 287 + 288 + // Register challenge handler on Node A 289 + const nodeABlockStore = makeBlockStoreAdapter(nodeA!); 290 + transportA!.onChallenge(async (challenge) => { 291 + return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 292 + }); 293 + 294 + // Both nodes subscribe (needed for gossipsub mesh formation) 295 + const pubsubA = getPubsub(nodeA!); 296 + const pubsubB = getPubsub(nodeB!); 297 + pubsubA.subscribe(topic); 298 + pubsubB.subscribe(topic); 299 + 300 + // Node B collects received gossipsub notifications 301 + const received: CommitNotification[] = []; 302 + pubsubB.addEventListener("message", (evt: unknown) => { 303 + try { 304 + const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; 305 + if (detail.topic === topic) { 306 + const notification = cborDecode(detail.data) as CommitNotification; 307 + received.push(notification); 308 + } 309 + } catch { 310 + // ignore decode errors 311 + } 312 + }); 313 + 314 + // Node A publishes CBOR notification (with retry loop for mesh formation) 315 + const notification: CommitNotification = { 316 + did: testDid, 317 + commit: rootCid, 318 + rev: "3jui7kd2xxxx2", 319 + time: new Date().toISOString(), 320 + peer: nodeA!.libp2p.peerId.toString(), 321 + }; 322 + const data = cborEncode(notification); 323 + 324 + await waitFor( 325 + async () => { 326 + if (received.length > 0) return true; 327 + await pubsubA.publish(topic, data).catch(() => {}); 328 + await new Promise((r) => setTimeout(r, 1000)); 329 + return received.length > 0; 330 + }, 331 + 30_000, 332 + 500, 333 + ); 334 + 335 + // Assert notification received with correct fields 336 + expect(received.length).toBe(1); 337 + expect(received[0]!.did).toBe(testDid); 338 + expect(received[0]!.commit).toBe(rootCid); 339 + expect(received[0]!.rev).toBe("3jui7kd2xxxx2"); 340 + expect(received[0]!.peer).toBe(nodeA!.libp2p.peerId.toString()); 341 + 342 + // Node B generates MST-proof challenge using the received commitCid 343 + const challenge = generateChallenge({ 344 + challengerDid: "did:plc:verifier", 345 + targetDid: "did:plc:prover", 346 + subjectDid: testDid, 347 + commitCid: received[0]!.commit, 348 + availableRecordPaths: recordPaths, 349 + challengeType: "mst-proof", 350 + epoch: 1, 351 + nonce: "e2e-multi-node-nonce", 352 + config: { recordCount: 2 }, 353 + }); 354 + 355 + // Node B sends challenge to Node A via libp2p 356 + const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 357 + const response = await transportB!.sendChallenge(addrA, challenge); 358 + 359 + // Assert challenge response is valid 360 + expect(response.challengeId).toBe(challenge.id); 361 + expect(response.mstProofs).toBeDefined(); 362 + expect(response.mstProofs!.length).toBe(challenge.recordPaths.length); 363 + 364 + // Verify proof cryptographically 365 + const result = await verifyResponse(challenge, response, ipfsService); 366 + expect(result.passed).toBe(true); 367 + expect(result.mstResults).toBeDefined(); 368 + expect(result.mstResults!.every((r) => r.valid)).toBe(true); 369 + }, 370 + ); 371 + 372 + it( 373 + "failover transport resolves HTTP endpoint to multiaddr via SyncStorage", 374 + { timeout: 60_000 }, 375 + async () => { 376 + await setupConnectedNodes(); 377 + 378 + const rootCid = await getRepoRootCid(); 379 + const recordPaths = await getRecordPaths(); 380 + 381 + // Register challenge handler on Node A 382 + const nodeABlockStore = makeBlockStoreAdapter(nodeA!); 383 + transportA!.onChallenge(async (challenge) => { 384 + return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 385 + }); 386 + 387 + // Create SyncStorage with Node A's multiaddr mapped to a fake HTTP endpoint 388 + const syncStorage = new SyncStorage(db); 389 + syncStorage.initSchema(); 390 + 391 + const fakeEndpoint = "https://pds-a.example.com"; 392 + const nodeAMultiaddr = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 393 + // Ensure multiaddr includes /p2p/ suffix with peer ID 394 + const nodeAPeerId = nodeA!.libp2p.peerId.toString(); 395 + const fullMultiaddr = nodeAMultiaddr.includes("/p2p/") 396 + ? nodeAMultiaddr 397 + : `${nodeAMultiaddr}/p2p/${nodeAPeerId}`; 398 + 399 + // Store the mapping: fake endpoint -> Node A's multiaddr 400 + syncStorage.upsertState({ 401 + did: "did:plc:nodeA", 402 + pdsEndpoint: fakeEndpoint, 403 + status: "synced", 404 + }); 405 + syncStorage.updatePeerInfo("did:plc:nodeA", nodeAPeerId, [fullMultiaddr]); 406 + 407 + // Sanity check: resolver returns a multiaddr with /p2p/ 408 + const resolved = syncStorage.getMultiaddrForPdsEndpoint(fakeEndpoint); 409 + expect(resolved).not.toBeNull(); 410 + expect(resolved!).toContain("/p2p/"); 411 + 412 + // Create mock HTTP transport that tracks calls and throws if invoked 413 + let httpCalls = 0; 414 + let fallbackCalled = false; 415 + const mockHttp: ChallengeTransport = { 416 + async sendChallenge() { 417 + httpCalls++; 418 + throw new Error("HTTP transport should not be called"); 419 + }, 420 + onChallenge() {}, 421 + }; 422 + 423 + // Create failover transport: libp2p primary, mock HTTP fallback 424 + const failover = new FailoverChallengeTransport( 425 + transportB!, 426 + mockHttp, 427 + { 428 + resolveEndpoint: (endpoint) => 429 + syncStorage.getMultiaddrForPdsEndpoint(endpoint), 430 + onFallback: () => { 431 + fallbackCalled = true; 432 + }, 433 + }, 434 + ); 435 + 436 + // Generate MST-proof challenge 437 + const challenge = generateChallenge({ 438 + challengerDid: "did:plc:verifier", 439 + targetDid: "did:plc:prover", 440 + subjectDid: "did:plc:test123", 441 + commitCid: rootCid, 442 + availableRecordPaths: recordPaths, 443 + challengeType: "mst-proof", 444 + epoch: 1, 445 + nonce: "e2e-failover-nonce", 446 + config: { recordCount: 2 }, 447 + }); 448 + 449 + // Send via failover transport using the HTTP endpoint 450 + const response = await failover.sendChallenge(fakeEndpoint, challenge); 451 + 452 + // Assert: response valid, HTTP not called, no fallback 453 + expect(response.challengeId).toBe(challenge.id); 454 + expect(response.mstProofs).toBeDefined(); 455 + expect(httpCalls).toBe(0); 456 + expect(fallbackCalled).toBe(false); 457 + 458 + // Verify proof cryptographically 459 + const result = await verifyResponse(challenge, response, ipfsService); 460 + expect(result.passed).toBe(true); 461 + }, 462 + ); 463 + });