atproto user agency toolkit for individuals and groups
1/**
2 * End-to-end challenge-response test over libp2p streams.
3 *
4 * Proves two real Helia nodes can exchange MST-proof, block-sample,
5 * and combined challenges over TCP using the Libp2pChallengeTransport.
6 */
7
8import { describe, it, expect, beforeEach, afterEach } from "vitest";
9import { mkdtempSync, rmSync } from "node:fs";
10import { tmpdir } from "node:os";
11import { join } from "node:path";
12import Database from "better-sqlite3";
13import type { Helia } from "@helia/interface";
14import type { Libp2p } from "@libp2p/interface";
15import { SqliteBlockstore } from "../../sqlite-blockstore.js";
16import { SqliteDatastore } from "../../sqlite-datastore.js";
17import { readCarWithRoot } from "@atproto/repo";
18
19import { IpfsService } from "../../ipfs.js";
20import { RepoManager } from "../../repo-manager.js";
21import type { Config } from "../../config.js";
22import { generateChallenge } from "./challenge-generator.js";
23import { respondToChallenge } from "./challenge-responder.js";
24import { verifyResponse } from "./challenge-verifier.js";
25import { Libp2pChallengeTransport, CHALLENGE_PROTOCOL } from "./libp2p-transport.js";
26
27function testConfig(dataDir: string): Config {
28 return {
29 DID: "did:plc:test123",
30 HANDLE: "test.example.com",
31 PDS_HOSTNAME: "test.example.com",
32 AUTH_TOKEN: "test-auth-token",
33 SIGNING_KEY:
34 "0000000000000000000000000000000000000000000000000000000000000001",
35 SIGNING_KEY_PUBLIC:
36 "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
37 JWT_SECRET: "test-jwt-secret",
38 PASSWORD_HASH: "$2a$10$test",
39 DATA_DIR: dataDir,
40 PORT: 3000,
41 IPFS_ENABLED: true,
42 IPFS_NETWORKING: false,
43 REPLICATE_DIDS: [],
44 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
45 FIREHOSE_ENABLED: false,
46 RATE_LIMIT_ENABLED: false,
47 RATE_LIMIT_READ_PER_MIN: 300,
48 RATE_LIMIT_SYNC_PER_MIN: 30,
49 RATE_LIMIT_SESSION_PER_MIN: 10,
50 RATE_LIMIT_WRITE_PER_MIN: 200,
51 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
52 RATE_LIMIT_MAX_CONNECTIONS: 100,
53 RATE_LIMIT_FIREHOSE_PER_IP: 3,
54 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
55 };
56}
57
58/**
59 * Create a minimal Helia node with TCP-only networking on localhost.
60 */
61async function createTestHeliaNode(
62 db: Database.Database,
63): Promise<Helia> {
64 const { createHelia } = await import("helia");
65 const { noise } = await import("@chainsafe/libp2p-noise");
66 const { yamux } = await import("@chainsafe/libp2p-yamux");
67 const { tcp } = await import("@libp2p/tcp");
68 const { identify } = await import("@libp2p/identify");
69 const { bitswap } = await import("@helia/block-brokers");
70 const { libp2pRouting } = await import("@helia/routers");
71 const { createLibp2p } = await import("libp2p");
72
73 const blockstore = new SqliteBlockstore(db);
74 const datastore = new SqliteDatastore(db);
75
76 const libp2p = await createLibp2p({
77 addresses: {
78 listen: ["/ip4/127.0.0.1/tcp/0"],
79 },
80 transports: [tcp()],
81 connectionEncrypters: [noise()],
82 streamMuxers: [yamux()],
83 services: {
84 identify: identify(),
85 },
86 });
87
88 const helia = await createHelia({
89 libp2p,
90 blockstore: blockstore as any,
91 datastore: datastore as any,
92 blockBrokers: [bitswap()],
93 routers: [libp2pRouting(libp2p)],
94 });
95
96 return helia;
97}
98
99async function waitFor(
100 fn: () => Promise<boolean> | boolean,
101 timeoutMs: number = 10_000,
102 intervalMs: number = 200,
103): Promise<void> {
104 const deadline = Date.now() + timeoutMs;
105 while (Date.now() < deadline) {
106 if (await fn()) return;
107 await new Promise((r) => setTimeout(r, intervalMs));
108 }
109 throw new Error(`waitFor timed out after ${timeoutMs}ms`);
110}
111
112describe("E2E challenge-response over libp2p", () => {
113 let tmpDir: string;
114 let nodeA: Helia | null = null;
115 let nodeB: Helia | null = null;
116 let nodeDbA: Database.Database | null = null;
117 let nodeDbB: Database.Database | null = null;
118 let transportA: Libp2pChallengeTransport | null = null;
119 let transportB: Libp2pChallengeTransport | null = null;
120 let db: InstanceType<typeof Database>;
121 let ipfsService: IpfsService;
122 let repoManager: RepoManager;
123
124 beforeEach(async () => {
125 tmpDir = mkdtempSync(join(tmpdir(), "e2e-challenge-test-"));
126 const config = testConfig(tmpDir);
127
128 // Set up repo + local IpfsService (networking=false) for test data
129 db = new Database(join(tmpDir, "test.db"));
130 ipfsService = new IpfsService({
131 db,
132 networking: false,
133 });
134 await ipfsService.start();
135 repoManager = new RepoManager(db, config);
136 repoManager.init(undefined, ipfsService, ipfsService);
137
138 // Create test records
139 for (let i = 0; i < 5; i++) {
140 await repoManager.createRecord(
141 "app.bsky.feed.post",
142 undefined,
143 {
144 $type: "app.bsky.feed.post",
145 text: `E2E challenge test post ${i}`,
146 createdAt: new Date().toISOString(),
147 },
148 );
149 }
150 });
151
152 afterEach(async () => {
153 const stops: Promise<void>[] = [];
154 if (transportA) stops.push(transportA.stop().catch(() => {}));
155 if (transportB) stops.push(transportB.stop().catch(() => {}));
156 if (nodeA) stops.push(nodeA.stop().catch(() => {}));
157 if (nodeB) stops.push(nodeB.stop().catch(() => {}));
158 await Promise.all(stops);
159 transportA = null;
160 transportB = null;
161 nodeA = null;
162 nodeB = null;
163
164 if (nodeDbA) { nodeDbA.close(); nodeDbA = null; }
165 if (nodeDbB) { nodeDbB.close(); nodeDbB = null; }
166
167 if (ipfsService.isRunning()) {
168 await ipfsService.stop();
169 }
170 db.close();
171 rmSync(tmpDir, { recursive: true, force: true });
172 });
173
174 async function getRepoRootCid(): Promise<string> {
175 const carBytes = await repoManager.getRepoCar();
176 const { root, blocks } = await readCarWithRoot(carBytes);
177 await ipfsService.putBlocks(blocks);
178 return root.toString();
179 }
180
181 async function getRecordPaths(): Promise<string[]> {
182 const records = await repoManager.listRecords("app.bsky.feed.post", {
183 limit: 100,
184 });
185 return records.records.map((r) => {
186 const rkey = r.uri.split("/").pop()!;
187 return `app.bsky.feed.post/${rkey}`;
188 });
189 }
190
191 async function getBlockCids(): Promise<string[]> {
192 const carBytes = await repoManager.getRepoCar();
193 const { blocks } = await readCarWithRoot(carBytes);
194 const cids: string[] = [];
195 const internalMap = (
196 blocks as unknown as { map: Map<string, Uint8Array> }
197 ).map;
198 if (internalMap) {
199 for (const cid of internalMap.keys()) {
200 cids.push(cid);
201 }
202 }
203 return cids;
204 }
205
206 /**
207 * Set up two Helia nodes, connect them, and create transports.
208 * Also copies all repo blocks to node A's blockstore so it can respond to challenges.
209 */
210 async function setupNodes(): Promise<void> {
211 nodeDbA = new Database(join(tmpDir, "node-a.db"));
212 nodeDbB = new Database(join(tmpDir, "node-b.db"));
213 nodeA = await createTestHeliaNode(nodeDbA);
214 nodeB = await createTestHeliaNode(nodeDbB);
215
216 // Connect B -> A
217 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!);
218 await waitFor(
219 () =>
220 nodeA!.libp2p.getConnections().length > 0 &&
221 nodeB!.libp2p.getConnections().length > 0,
222 5_000,
223 );
224
225 // Copy all repo blocks to node A's blockstore
226 const carBytes = await repoManager.getRepoCar();
227 const { blocks } = await readCarWithRoot(carBytes);
228 const internalMap = (
229 blocks as unknown as { map: Map<string, Uint8Array> }
230 ).map;
231 if (internalMap) {
232 const { CID } = await import("multiformats");
233 for (const [cidStr, bytes] of internalMap) {
234 const cid = CID.parse(cidStr);
235 await nodeA!.blockstore.put(cid, bytes);
236 }
237 }
238
239 transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p);
240 transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p);
241 }
242
243 it("MST proof challenge roundtrip over libp2p", { timeout: 60_000 }, async () => {
244 await setupNodes();
245
246 const rootCid = await getRepoRootCid();
247 const recordPaths = await getRecordPaths();
248
249 // Node A handles challenges using its local blockstore (wrapped as BlockStore)
250 const nodeABlockStore: import("../../ipfs.js").BlockStore = {
251 async putBlock(cidStr: string, bytes: Uint8Array) {
252 const { CID } = await import("multiformats");
253 await nodeA!.blockstore.put(CID.parse(cidStr), bytes);
254 },
255 async getBlock(cidStr: string) {
256 try {
257 const { CID } = await import("multiformats");
258 const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any);
259 // Collect async generator
260 const chunks: Uint8Array[] = [];
261 for await (const chunk of bytes) {
262 chunks.push(chunk);
263 }
264 if (chunks.length === 0) return null;
265 if (chunks.length === 1) return chunks[0]!;
266 const total = chunks.reduce((acc, c) => acc + c.length, 0);
267 const result = new Uint8Array(total);
268 let offset = 0;
269 for (const c of chunks) {
270 result.set(c, offset);
271 offset += c.length;
272 }
273 return result;
274 } catch {
275 return null;
276 }
277 },
278 async hasBlock(cidStr: string) {
279 try {
280 const { CID } = await import("multiformats");
281 return await nodeA!.blockstore.has(CID.parse(cidStr));
282 } catch {
283 return false;
284 }
285 },
286 async putBlocks() {},
287 async deleteBlock() {},
288 };
289
290 // Register challenge handler on node A
291 transportA!.onChallenge(async (challenge) => {
292 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover");
293 });
294
295 // Generate challenge on node B
296 const challenge = generateChallenge({
297 challengerDid: "did:plc:verifier",
298 targetDid: "did:plc:prover",
299 subjectDid: "did:plc:test123",
300 commitCid: rootCid,
301 availableRecordPaths: recordPaths,
302 challengeType: "mst-proof",
303 epoch: 1,
304 nonce: "e2e-test-nonce",
305 config: { recordCount: 2 },
306 });
307
308 // Send challenge from B to A over libp2p
309 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString();
310 const response = await transportB!.sendChallenge(addrA, challenge);
311
312 expect(response.challengeId).toBe(challenge.id);
313 expect(response.mstProofs).toBeDefined();
314 expect(response.mstProofs!.length).toBe(challenge.recordPaths.length);
315
316 // Verify the response
317 const result = await verifyResponse(challenge, response, ipfsService);
318 expect(result.passed).toBe(true);
319 expect(result.mstResults).toBeDefined();
320 expect(result.mstResults!.every((r) => r.valid)).toBe(true);
321 });
322
323 it("block-sample challenge roundtrip over libp2p", { timeout: 60_000 }, async () => {
324 await setupNodes();
325
326 const rootCid = await getRepoRootCid();
327 const blockCids = await getBlockCids();
328
329 // Node A handles challenges
330 const nodeABlockStore: import("../../ipfs.js").BlockStore = {
331 async putBlock(cidStr: string, bytes: Uint8Array) {
332 const { CID } = await import("multiformats");
333 await nodeA!.blockstore.put(CID.parse(cidStr), bytes);
334 },
335 async getBlock(cidStr: string) {
336 try {
337 const { CID } = await import("multiformats");
338 const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any);
339 const chunks: Uint8Array[] = [];
340 for await (const chunk of bytes) {
341 chunks.push(chunk);
342 }
343 if (chunks.length === 0) return null;
344 if (chunks.length === 1) return chunks[0]!;
345 const total = chunks.reduce((acc, c) => acc + c.length, 0);
346 const result = new Uint8Array(total);
347 let offset = 0;
348 for (const c of chunks) {
349 result.set(c, offset);
350 offset += c.length;
351 }
352 return result;
353 } catch {
354 return null;
355 }
356 },
357 async hasBlock(cidStr: string) {
358 try {
359 const { CID } = await import("multiformats");
360 return await nodeA!.blockstore.has(CID.parse(cidStr));
361 } catch {
362 return false;
363 }
364 },
365 async putBlocks() {},
366 async deleteBlock() {},
367 };
368
369 transportA!.onChallenge(async (challenge) => {
370 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover");
371 });
372
373 const challenge = generateChallenge({
374 challengerDid: "did:plc:verifier",
375 targetDid: "did:plc:prover",
376 subjectDid: "did:plc:test123",
377 commitCid: rootCid,
378 availableRecordPaths: [],
379 availableBlockCids: blockCids,
380 challengeType: "block-sample",
381 epoch: 1,
382 nonce: "e2e-block-nonce",
383 config: { blockSampleSize: 3 },
384 });
385
386 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString();
387 const response = await transportB!.sendChallenge(addrA, challenge);
388
389 expect(response.blockResults).toBeDefined();
390 expect(response.blockResults!.every((r) => r.available)).toBe(true);
391
392 const result = await verifyResponse(challenge, response, ipfsService);
393 expect(result.passed).toBe(true);
394 expect(result.blockResults).toBeDefined();
395 expect(result.blockResults!.every((r) => r.available && r.prefixValid)).toBe(true);
396 });
397
398 it("combined challenge roundtrip over libp2p", { timeout: 60_000 }, async () => {
399 await setupNodes();
400
401 const rootCid = await getRepoRootCid();
402 const recordPaths = await getRecordPaths();
403 const blockCids = await getBlockCids();
404
405 const nodeABlockStore: import("../../ipfs.js").BlockStore = {
406 async putBlock(cidStr: string, bytes: Uint8Array) {
407 const { CID } = await import("multiformats");
408 await nodeA!.blockstore.put(CID.parse(cidStr), bytes);
409 },
410 async getBlock(cidStr: string) {
411 try {
412 const { CID } = await import("multiformats");
413 const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any);
414 const chunks: Uint8Array[] = [];
415 for await (const chunk of bytes) {
416 chunks.push(chunk);
417 }
418 if (chunks.length === 0) return null;
419 if (chunks.length === 1) return chunks[0]!;
420 const total = chunks.reduce((acc, c) => acc + c.length, 0);
421 const result = new Uint8Array(total);
422 let offset = 0;
423 for (const c of chunks) {
424 result.set(c, offset);
425 offset += c.length;
426 }
427 return result;
428 } catch {
429 return null;
430 }
431 },
432 async hasBlock(cidStr: string) {
433 try {
434 const { CID } = await import("multiformats");
435 return await nodeA!.blockstore.has(CID.parse(cidStr));
436 } catch {
437 return false;
438 }
439 },
440 async putBlocks() {},
441 async deleteBlock() {},
442 };
443
444 transportA!.onChallenge(async (challenge) => {
445 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover");
446 });
447
448 const challenge = generateChallenge({
449 challengerDid: "did:plc:verifier",
450 targetDid: "did:plc:prover",
451 subjectDid: "did:plc:test123",
452 commitCid: rootCid,
453 availableRecordPaths: recordPaths,
454 availableBlockCids: blockCids,
455 challengeType: "combined",
456 epoch: 1,
457 nonce: "e2e-combined-nonce",
458 config: { recordCount: 2, blockSampleSize: 2 },
459 });
460
461 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString();
462 const response = await transportB!.sendChallenge(addrA, challenge);
463
464 expect(response.mstProofs).toBeDefined();
465 expect(response.blockResults).toBeDefined();
466
467 const result = await verifyResponse(challenge, response, ipfsService);
468 expect(result.passed).toBe(true);
469 expect(result.mstResults).toBeDefined();
470 expect(result.blockResults).toBeDefined();
471 });
472
473 it("handler lifecycle: onChallenge registers, stop() unregisters", { timeout: 60_000 }, async () => {
474 await setupNodes();
475
476 // Register handler
477 transportA!.onChallenge(async (challenge) => {
478 return {
479 challengeId: challenge.id,
480 responderDid: "did:plc:prover",
481 respondedAt: new Date().toISOString(),
482 };
483 });
484
485 // Verify protocol is registered by checking that dialProtocol works
486 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString();
487 const ma = (await import("@multiformats/multiaddr")).multiaddr(addrA);
488 const stream = await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL);
489 stream.abort(new Error("test complete"));
490
491 // Unregister
492 await transportA!.stop();
493
494 // After stop, dialProtocol should fail
495 try {
496 await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL);
497 // If we get here, the protocol was not unregistered
498 expect.fail("Expected dialProtocol to fail after stop()");
499 } catch (err) {
500 // Expected: protocol not supported after unhandle
501 expect(err).toBeDefined();
502 }
503 });
504});