atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add libp2p challenge transport, E2E test, and MST proof XRPC endpoint

Enables challenge-response verification over direct P2P connections
without requiring public HTTP endpoints. Adds /p2pds/challenge/1.0.0
protocol using half-close request-response streams, with fallback to
HTTP transport. New getMstProof XRPC endpoint lets light clients
request and verify MST proofs without downloading full repos.

+708 -2
+78
src/index.ts
··· 14 14 import { respondToChallenge } from "./replication/challenge-response/challenge-responder.js"; 15 15 import { serializeResponse } from "./replication/challenge-response/http-transport.js"; 16 16 import type { StorageChallenge } from "./replication/challenge-response/types.js"; 17 + import { generateMstProof } from "./replication/mst-proof.js"; 17 18 18 19 const VERSION = "0.1.0"; 19 20 ··· 481 482 peerDid ?? undefined, 482 483 ), 483 484 }); 485 + }); 486 + 487 + // ============================================ 488 + // MST Proof serving 489 + // ============================================ 490 + 491 + app.get("/xrpc/org.p2pds.verification.getMstProof", async (c) => { 492 + const did = c.req.query("did"); 493 + const recordPath = c.req.query("recordPath"); 494 + 495 + if (!did) { 496 + return c.json( 497 + { error: "MissingParameter", message: "did is required" }, 498 + 400, 499 + ); 500 + } 501 + if (!recordPath) { 502 + return c.json( 503 + { error: "MissingParameter", message: "recordPath is required" }, 504 + 400, 505 + ); 506 + } 507 + if (!replicationManager) { 508 + return c.json( 509 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 510 + 400, 511 + ); 512 + } 513 + if (!blockStore) { 514 + return c.json( 515 + { error: "BlockStoreNotAvailable", message: "Block store is not available" }, 516 + 400, 517 + ); 518 + } 519 + 520 + const syncState = replicationManager.getSyncStorage().getState(did); 521 + if (!syncState) { 522 + return c.json( 523 + { error: "DIDNotTracked", message: `DID not tracked: ${did}` }, 524 + 404, 525 + ); 526 + } 527 + 528 + const rootCid = syncState.rootCid; 529 + if (!rootCid) { 530 + return c.json( 531 + { error: "NoRootCid", message: `No root CID yet for ${did}` }, 532 + 404, 533 + ); 534 + } 535 + 536 + try { 537 + const proof = await generateMstProof(blockStore, rootCid, recordPath); 538 + // Serialize Uint8Array fields to base64 for JSON transport 539 + return c.json({ 540 + proof: { 541 + commitBlock: { 542 + cid: proof.commitBlock.cid, 543 + bytes: Buffer.from(proof.commitBlock.bytes).toString("base64"), 544 + }, 545 + nodes: proof.nodes.map((node) => ({ 546 + cid: node.cid, 547 + bytes: Buffer.from(node.bytes).toString("base64"), 548 + })), 549 + recordCid: proof.recordCid, 550 + found: proof.found, 551 + }, 552 + }); 553 + } catch (err) { 554 + return c.json( 555 + { 556 + error: "ProofGenerationFailed", 557 + message: err instanceof Error ? err.message : String(err), 558 + }, 559 + 500, 560 + ); 561 + } 484 562 }); 485 563 486 564 return app;
+9
src/ipfs.ts
··· 193 193 isRunning(): boolean { 194 194 return this.running; 195 195 } 196 + 197 + /** 198 + * Get the underlying libp2p node, if networking is enabled. 199 + * Typed as `unknown` at the boundary to avoid importing libp2p types 200 + * into this module. Callers cast to `Libp2p` when needed. 201 + */ 202 + getLibp2p(): unknown | null { 203 + return this.helia?.libp2p ?? null; 204 + } 196 205 }
+493
src/replication/challenge-response/e2e-challenge.test.ts
··· 1 + /** 2 + * End-to-end challenge-response test over libp2p streams. 3 + * 4 + * Proves two real Helia nodes can exchange MST-proof, block-sample, 5 + * and combined challenges over TCP using the Libp2pChallengeTransport. 6 + */ 7 + 8 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 9 + import { mkdtempSync, rmSync } from "node:fs"; 10 + import { tmpdir } from "node:os"; 11 + import { join } from "node:path"; 12 + import Database from "better-sqlite3"; 13 + import type { Helia } from "@helia/interface"; 14 + import { FsBlockstore } from "blockstore-fs"; 15 + import { FsDatastore } from "datastore-fs"; 16 + import type { Libp2p } from "@libp2p/interface"; 17 + import { readCarWithRoot } from "@atproto/repo"; 18 + 19 + import { IpfsService } from "../../ipfs.js"; 20 + import { RepoManager } from "../../repo-manager.js"; 21 + import type { Config } from "../../config.js"; 22 + import { generateChallenge } from "./challenge-generator.js"; 23 + import { respondToChallenge } from "./challenge-responder.js"; 24 + import { verifyResponse } from "./challenge-verifier.js"; 25 + import { Libp2pChallengeTransport, CHALLENGE_PROTOCOL } from "./libp2p-transport.js"; 26 + 27 + function testConfig(dataDir: string): Config { 28 + return { 29 + DID: "did:plc:test123", 30 + HANDLE: "test.example.com", 31 + PDS_HOSTNAME: "test.example.com", 32 + AUTH_TOKEN: "test-auth-token", 33 + SIGNING_KEY: 34 + "0000000000000000000000000000000000000000000000000000000000000001", 35 + SIGNING_KEY_PUBLIC: 36 + "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 37 + JWT_SECRET: "test-jwt-secret", 38 + PASSWORD_HASH: "$2a$10$test", 39 + DATA_DIR: dataDir, 40 + PORT: 3000, 41 + IPFS_ENABLED: true, 42 + IPFS_NETWORKING: false, 43 + REPLICATE_DIDS: [], 44 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 45 + FIREHOSE_ENABLED: false, 46 + }; 47 + } 48 + 49 + /** 50 + * Create a minimal Helia node with TCP-only networking on localhost. 51 + */ 52 + async function createTestHeliaNode( 53 + blocksPath: string, 54 + datastorePath: string, 55 + ): Promise<Helia> { 56 + const { createHelia } = await import("helia"); 57 + const { noise } = await import("@chainsafe/libp2p-noise"); 58 + const { yamux } = await import("@chainsafe/libp2p-yamux"); 59 + const { tcp } = await import("@libp2p/tcp"); 60 + const { identify } = await import("@libp2p/identify"); 61 + const { bitswap } = await import("@helia/block-brokers"); 62 + const { libp2pRouting } = await import("@helia/routers"); 63 + const { createLibp2p } = await import("libp2p"); 64 + 65 + const blockstore = new FsBlockstore(blocksPath); 66 + const datastore = new FsDatastore(datastorePath); 67 + 68 + const libp2p = await createLibp2p({ 69 + addresses: { 70 + listen: ["/ip4/127.0.0.1/tcp/0"], 71 + }, 72 + transports: [tcp()], 73 + connectionEncrypters: [noise()], 74 + streamMuxers: [yamux()], 75 + services: { 76 + identify: identify(), 77 + }, 78 + }); 79 + 80 + const helia = await createHelia({ 81 + libp2p, 82 + blockstore, 83 + datastore, 84 + blockBrokers: [bitswap()], 85 + routers: [libp2pRouting(libp2p)], 86 + }); 87 + 88 + return helia; 89 + } 90 + 91 + async function waitFor( 92 + fn: () => Promise<boolean> | boolean, 93 + timeoutMs: number = 10_000, 94 + intervalMs: number = 200, 95 + ): Promise<void> { 96 + const deadline = Date.now() + timeoutMs; 97 + while (Date.now() < deadline) { 98 + if (await fn()) return; 99 + await new Promise((r) => setTimeout(r, intervalMs)); 100 + } 101 + throw new Error(`waitFor timed out after ${timeoutMs}ms`); 102 + } 103 + 104 + describe("E2E challenge-response over libp2p", () => { 105 + let tmpDir: string; 106 + let nodeA: Helia | null = null; 107 + let nodeB: Helia | null = null; 108 + let transportA: Libp2pChallengeTransport | null = null; 109 + let transportB: Libp2pChallengeTransport | null = null; 110 + let db: InstanceType<typeof Database>; 111 + let ipfsService: IpfsService; 112 + let repoManager: RepoManager; 113 + 114 + beforeEach(async () => { 115 + tmpDir = mkdtempSync(join(tmpdir(), "e2e-challenge-test-")); 116 + const config = testConfig(tmpDir); 117 + 118 + // Set up repo + local IpfsService (networking=false) for test data 119 + db = new Database(join(tmpDir, "test.db")); 120 + ipfsService = new IpfsService({ 121 + blocksPath: join(tmpDir, "ipfs-blocks"), 122 + datastorePath: join(tmpDir, "ipfs-datastore"), 123 + networking: false, 124 + }); 125 + await ipfsService.start(); 126 + repoManager = new RepoManager(db, config); 127 + repoManager.init(undefined, ipfsService, ipfsService); 128 + 129 + // Create test records 130 + for (let i = 0; i < 5; i++) { 131 + await repoManager.createRecord( 132 + "app.bsky.feed.post", 133 + undefined, 134 + { 135 + $type: "app.bsky.feed.post", 136 + text: `E2E challenge test post ${i}`, 137 + createdAt: new Date().toISOString(), 138 + }, 139 + ); 140 + } 141 + }); 142 + 143 + afterEach(async () => { 144 + const stops: Promise<void>[] = []; 145 + if (transportA) stops.push(transportA.stop().catch(() => {})); 146 + if (transportB) stops.push(transportB.stop().catch(() => {})); 147 + if (nodeA) stops.push(nodeA.stop().catch(() => {})); 148 + if (nodeB) stops.push(nodeB.stop().catch(() => {})); 149 + await Promise.all(stops); 150 + transportA = null; 151 + transportB = null; 152 + nodeA = null; 153 + nodeB = null; 154 + 155 + if (ipfsService.isRunning()) { 156 + await ipfsService.stop(); 157 + } 158 + db.close(); 159 + rmSync(tmpDir, { recursive: true, force: true }); 160 + }); 161 + 162 + async function getRepoRootCid(): Promise<string> { 163 + const carBytes = await repoManager.getRepoCar(); 164 + const { root, blocks } = await readCarWithRoot(carBytes); 165 + await ipfsService.putBlocks(blocks); 166 + return root.toString(); 167 + } 168 + 169 + async function getRecordPaths(): Promise<string[]> { 170 + const records = await repoManager.listRecords("app.bsky.feed.post", { 171 + limit: 100, 172 + }); 173 + return records.records.map((r) => { 174 + const rkey = r.uri.split("/").pop()!; 175 + return `app.bsky.feed.post/${rkey}`; 176 + }); 177 + } 178 + 179 + async function getBlockCids(): Promise<string[]> { 180 + const carBytes = await repoManager.getRepoCar(); 181 + const { blocks } = await readCarWithRoot(carBytes); 182 + const cids: string[] = []; 183 + const internalMap = ( 184 + blocks as unknown as { map: Map<string, Uint8Array> } 185 + ).map; 186 + if (internalMap) { 187 + for (const cid of internalMap.keys()) { 188 + cids.push(cid); 189 + } 190 + } 191 + return cids; 192 + } 193 + 194 + /** 195 + * Set up two Helia nodes, connect them, and create transports. 196 + * Also copies all repo blocks to node A's blockstore so it can respond to challenges. 197 + */ 198 + async function setupNodes(): Promise<void> { 199 + nodeA = await createTestHeliaNode( 200 + join(tmpDir, "a-blocks"), 201 + join(tmpDir, "a-datastore"), 202 + ); 203 + nodeB = await createTestHeliaNode( 204 + join(tmpDir, "b-blocks"), 205 + join(tmpDir, "b-datastore"), 206 + ); 207 + 208 + // Connect B -> A 209 + await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); 210 + await waitFor( 211 + () => 212 + nodeA!.libp2p.getConnections().length > 0 && 213 + nodeB!.libp2p.getConnections().length > 0, 214 + 5_000, 215 + ); 216 + 217 + // Copy all repo blocks to node A's blockstore 218 + const carBytes = await repoManager.getRepoCar(); 219 + const { blocks } = await readCarWithRoot(carBytes); 220 + const internalMap = ( 221 + blocks as unknown as { map: Map<string, Uint8Array> } 222 + ).map; 223 + if (internalMap) { 224 + const { CID } = await import("multiformats"); 225 + for (const [cidStr, bytes] of internalMap) { 226 + const cid = CID.parse(cidStr); 227 + await nodeA!.blockstore.put(cid, bytes); 228 + } 229 + } 230 + 231 + transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p); 232 + transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p); 233 + } 234 + 235 + it("MST proof challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { 236 + await setupNodes(); 237 + 238 + const rootCid = await getRepoRootCid(); 239 + const recordPaths = await getRecordPaths(); 240 + 241 + // Node A handles challenges using its local blockstore (wrapped as BlockStore) 242 + const nodeABlockStore: import("../../ipfs.js").BlockStore = { 243 + async putBlock(cidStr: string, bytes: Uint8Array) { 244 + const { CID } = await import("multiformats"); 245 + await nodeA!.blockstore.put(CID.parse(cidStr), bytes); 246 + }, 247 + async getBlock(cidStr: string) { 248 + try { 249 + const { CID } = await import("multiformats"); 250 + const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); 251 + // Collect async generator 252 + const chunks: Uint8Array[] = []; 253 + for await (const chunk of bytes) { 254 + chunks.push(chunk); 255 + } 256 + if (chunks.length === 0) return null; 257 + if (chunks.length === 1) return chunks[0]!; 258 + const total = chunks.reduce((acc, c) => acc + c.length, 0); 259 + const result = new Uint8Array(total); 260 + let offset = 0; 261 + for (const c of chunks) { 262 + result.set(c, offset); 263 + offset += c.length; 264 + } 265 + return result; 266 + } catch { 267 + return null; 268 + } 269 + }, 270 + async hasBlock(cidStr: string) { 271 + try { 272 + const { CID } = await import("multiformats"); 273 + return await nodeA!.blockstore.has(CID.parse(cidStr)); 274 + } catch { 275 + return false; 276 + } 277 + }, 278 + async putBlocks() {}, 279 + }; 280 + 281 + // Register challenge handler on node A 282 + transportA!.onChallenge(async (challenge) => { 283 + return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 284 + }); 285 + 286 + // Generate challenge on node B 287 + const challenge = generateChallenge({ 288 + challengerDid: "did:plc:verifier", 289 + targetDid: "did:plc:prover", 290 + subjectDid: "did:plc:test123", 291 + commitCid: rootCid, 292 + availableRecordPaths: recordPaths, 293 + challengeType: "mst-proof", 294 + epoch: 1, 295 + nonce: "e2e-test-nonce", 296 + config: { recordCount: 2 }, 297 + }); 298 + 299 + // Send challenge from B to A over libp2p 300 + const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 301 + const response = await transportB!.sendChallenge(addrA, challenge); 302 + 303 + expect(response.challengeId).toBe(challenge.id); 304 + expect(response.mstProofs).toBeDefined(); 305 + expect(response.mstProofs!.length).toBe(challenge.recordPaths.length); 306 + 307 + // Verify the response 308 + const result = await verifyResponse(challenge, response, ipfsService); 309 + expect(result.passed).toBe(true); 310 + expect(result.mstResults).toBeDefined(); 311 + expect(result.mstResults!.every((r) => r.valid)).toBe(true); 312 + }); 313 + 314 + it("block-sample challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { 315 + await setupNodes(); 316 + 317 + const rootCid = await getRepoRootCid(); 318 + const blockCids = await getBlockCids(); 319 + 320 + // Node A handles challenges 321 + const nodeABlockStore: import("../../ipfs.js").BlockStore = { 322 + async putBlock(cidStr: string, bytes: Uint8Array) { 323 + const { CID } = await import("multiformats"); 324 + await nodeA!.blockstore.put(CID.parse(cidStr), bytes); 325 + }, 326 + async getBlock(cidStr: string) { 327 + try { 328 + const { CID } = await import("multiformats"); 329 + const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); 330 + const chunks: Uint8Array[] = []; 331 + for await (const chunk of bytes) { 332 + chunks.push(chunk); 333 + } 334 + if (chunks.length === 0) return null; 335 + if (chunks.length === 1) return chunks[0]!; 336 + const total = chunks.reduce((acc, c) => acc + c.length, 0); 337 + const result = new Uint8Array(total); 338 + let offset = 0; 339 + for (const c of chunks) { 340 + result.set(c, offset); 341 + offset += c.length; 342 + } 343 + return result; 344 + } catch { 345 + return null; 346 + } 347 + }, 348 + async hasBlock(cidStr: string) { 349 + try { 350 + const { CID } = await import("multiformats"); 351 + return await nodeA!.blockstore.has(CID.parse(cidStr)); 352 + } catch { 353 + return false; 354 + } 355 + }, 356 + async putBlocks() {}, 357 + }; 358 + 359 + transportA!.onChallenge(async (challenge) => { 360 + return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 361 + }); 362 + 363 + const challenge = generateChallenge({ 364 + challengerDid: "did:plc:verifier", 365 + targetDid: "did:plc:prover", 366 + subjectDid: "did:plc:test123", 367 + commitCid: rootCid, 368 + availableRecordPaths: [], 369 + availableBlockCids: blockCids, 370 + challengeType: "block-sample", 371 + epoch: 1, 372 + nonce: "e2e-block-nonce", 373 + config: { blockSampleSize: 3 }, 374 + }); 375 + 376 + const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 377 + const response = await transportB!.sendChallenge(addrA, challenge); 378 + 379 + expect(response.blockResults).toBeDefined(); 380 + expect(response.blockResults!.every((r) => r.available)).toBe(true); 381 + 382 + const result = await verifyResponse(challenge, response, ipfsService); 383 + expect(result.passed).toBe(true); 384 + expect(result.blockResults).toBeDefined(); 385 + expect(result.blockResults!.every((r) => r.available && r.prefixValid)).toBe(true); 386 + }); 387 + 388 + it("combined challenge roundtrip over libp2p", { timeout: 60_000 }, async () => { 389 + await setupNodes(); 390 + 391 + const rootCid = await getRepoRootCid(); 392 + const recordPaths = await getRecordPaths(); 393 + const blockCids = await getBlockCids(); 394 + 395 + const nodeABlockStore: import("../../ipfs.js").BlockStore = { 396 + async putBlock(cidStr: string, bytes: Uint8Array) { 397 + const { CID } = await import("multiformats"); 398 + await nodeA!.blockstore.put(CID.parse(cidStr), bytes); 399 + }, 400 + async getBlock(cidStr: string) { 401 + try { 402 + const { CID } = await import("multiformats"); 403 + const bytes = await nodeA!.blockstore.get(CID.parse(cidStr), { offline: true } as any); 404 + const chunks: Uint8Array[] = []; 405 + for await (const chunk of bytes) { 406 + chunks.push(chunk); 407 + } 408 + if (chunks.length === 0) return null; 409 + if (chunks.length === 1) return chunks[0]!; 410 + const total = chunks.reduce((acc, c) => acc + c.length, 0); 411 + const result = new Uint8Array(total); 412 + let offset = 0; 413 + for (const c of chunks) { 414 + result.set(c, offset); 415 + offset += c.length; 416 + } 417 + return result; 418 + } catch { 419 + return null; 420 + } 421 + }, 422 + async hasBlock(cidStr: string) { 423 + try { 424 + const { CID } = await import("multiformats"); 425 + return await nodeA!.blockstore.has(CID.parse(cidStr)); 426 + } catch { 427 + return false; 428 + } 429 + }, 430 + async putBlocks() {}, 431 + }; 432 + 433 + transportA!.onChallenge(async (challenge) => { 434 + return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 435 + }); 436 + 437 + const challenge = generateChallenge({ 438 + challengerDid: "did:plc:verifier", 439 + targetDid: "did:plc:prover", 440 + subjectDid: "did:plc:test123", 441 + commitCid: rootCid, 442 + availableRecordPaths: recordPaths, 443 + availableBlockCids: blockCids, 444 + challengeType: "combined", 445 + epoch: 1, 446 + nonce: "e2e-combined-nonce", 447 + config: { recordCount: 2, blockSampleSize: 2 }, 448 + }); 449 + 450 + const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 451 + const response = await transportB!.sendChallenge(addrA, challenge); 452 + 453 + expect(response.mstProofs).toBeDefined(); 454 + expect(response.blockResults).toBeDefined(); 455 + 456 + const result = await verifyResponse(challenge, response, ipfsService); 457 + expect(result.passed).toBe(true); 458 + expect(result.mstResults).toBeDefined(); 459 + expect(result.blockResults).toBeDefined(); 460 + }); 461 + 462 + it("handler lifecycle: onChallenge registers, stop() unregisters", { timeout: 60_000 }, async () => { 463 + await setupNodes(); 464 + 465 + // Register handler 466 + transportA!.onChallenge(async (challenge) => { 467 + return { 468 + challengeId: challenge.id, 469 + responderDid: "did:plc:prover", 470 + respondedAt: new Date().toISOString(), 471 + }; 472 + }); 473 + 474 + // Verify protocol is registered by checking that dialProtocol works 475 + const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 476 + const ma = (await import("@multiformats/multiaddr")).multiaddr(addrA); 477 + const stream = await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); 478 + stream.abort(new Error("test complete")); 479 + 480 + // Unregister 481 + await transportA!.stop(); 482 + 483 + // After stop, dialProtocol should fail 484 + try { 485 + await nodeB!.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); 486 + // If we get here, the protocol was not unregistered 487 + expect.fail("Expected dialProtocol to fail after stop()"); 488 + } catch (err) { 489 + // Expected: protocol not supported after unhandle 490 + expect(err).toBeDefined(); 491 + } 492 + }); 493 + });
+1
src/replication/challenge-response/index.ts
··· 44 44 // Transport 45 45 export type { ChallengeTransport } from "./transport.js"; 46 46 export { HttpChallengeTransport } from "./http-transport.js"; 47 + export { Libp2pChallengeTransport, CHALLENGE_PROTOCOL } from "./libp2p-transport.js"; 47 48 48 49 // Scheduler 49 50 export { ChallengeScheduler } from "./challenge-scheduler.js";
+120
src/replication/challenge-response/libp2p-transport.ts
··· 1 + /** 2 + * libp2p stream transport for the challenge-response protocol. 3 + * 4 + * Uses a custom libp2p protocol to exchange challenges over direct P2P 5 + * connections, enabling proof-of-storage verification without requiring 6 + * peers to have public HTTP endpoints. 7 + * 8 + * Wire format (half-close request-response): 9 + * 1. Requester: send() challenge JSON bytes, then close() write end 10 + * 2. Responder: read all bytes, process, send() response JSON bytes, then close() 11 + * 3. Requester: read all response bytes 12 + */ 13 + 14 + import type { Libp2p, Stream } from "@libp2p/interface"; 15 + import { multiaddr } from "@multiformats/multiaddr"; 16 + import type { StorageChallenge, StorageChallengeResponse } from "./types.js"; 17 + import type { ChallengeTransport } from "./transport.js"; 18 + import { serializeResponse, deserializeResponse } from "./http-transport.js"; 19 + 20 + export const CHALLENGE_PROTOCOL = "/p2pds/challenge/1.0.0"; 21 + 22 + /** 23 + * Collect all chunks from a libp2p stream into a single Uint8Array. 24 + * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray(). 25 + */ 26 + async function collectStream(stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>): Promise<Uint8Array> { 27 + const chunks: Uint8Array[] = []; 28 + for await (const chunk of stream) { 29 + if (chunk instanceof Uint8Array) { 30 + chunks.push(chunk); 31 + } else { 32 + // Uint8ArrayList — convert to Uint8Array 33 + chunks.push(chunk.subarray()); 34 + } 35 + } 36 + if (chunks.length === 0) return new Uint8Array(0); 37 + if (chunks.length === 1) return chunks[0]!; 38 + const total = chunks.reduce((acc, c) => acc + c.length, 0); 39 + const result = new Uint8Array(total); 40 + let offset = 0; 41 + for (const c of chunks) { 42 + result.set(c, offset); 43 + offset += c.length; 44 + } 45 + return result; 46 + } 47 + 48 + export class Libp2pChallengeTransport implements ChallengeTransport { 49 + private handler: 50 + | ((challenge: StorageChallenge) => Promise<StorageChallengeResponse>) 51 + | null = null; 52 + 53 + constructor(private libp2p: Libp2p) {} 54 + 55 + async sendChallenge( 56 + targetEndpoint: string, 57 + challenge: StorageChallenge, 58 + ): Promise<StorageChallengeResponse> { 59 + const ma = multiaddr(targetEndpoint); 60 + const stream = await this.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); 61 + 62 + try { 63 + // Send challenge JSON and close write end 64 + const challengeBytes = new TextEncoder().encode( 65 + JSON.stringify(challenge), 66 + ); 67 + stream.send(challengeBytes); 68 + await stream.close(); 69 + 70 + // Read response 71 + const responseBytes = await collectStream( 72 + stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 73 + ); 74 + const raw = JSON.parse(new TextDecoder().decode(responseBytes)); 75 + return deserializeResponse(raw); 76 + } catch (err) { 77 + stream.abort( 78 + err instanceof Error ? err : new Error(String(err)), 79 + ); 80 + throw err; 81 + } 82 + } 83 + 84 + onChallenge( 85 + handler: ( 86 + challenge: StorageChallenge, 87 + ) => Promise<StorageChallengeResponse>, 88 + ): void { 89 + this.handler = handler; 90 + 91 + this.libp2p.handle(CHALLENGE_PROTOCOL, async (stream: Stream) => { 92 + try { 93 + // Read challenge bytes 94 + const challengeBytes = await collectStream( 95 + stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 96 + ); 97 + const challenge = JSON.parse( 98 + new TextDecoder().decode(challengeBytes), 99 + ) as StorageChallenge; 100 + 101 + // Process and send response 102 + const response = await handler(challenge); 103 + const serialized = serializeResponse(response); 104 + const responseBytes = new TextEncoder().encode( 105 + JSON.stringify(serialized), 106 + ); 107 + stream.send(responseBytes); 108 + await stream.close(); 109 + } catch (err) { 110 + stream.abort( 111 + err instanceof Error ? err : new Error(String(err)), 112 + ); 113 + } 114 + }); 115 + } 116 + 117 + async stop(): Promise<void> { 118 + await this.libp2p.unhandle(CHALLENGE_PROTOCOL); 119 + } 120 + }
+7 -2
src/server.ts
··· 18 18 import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 19 19 import { PolicyEngine } from "./policy/engine.js"; 20 20 import { HttpChallengeTransport } from "./replication/challenge-response/http-transport.js"; 21 + import { Libp2pChallengeTransport } from "./replication/challenge-response/libp2p-transport.js"; 22 + import type { Libp2p } from "@libp2p/interface"; 21 23 22 24 // Load configuration 23 25 const config = loadConfig(); ··· 178 180 } 179 181 // Start challenge scheduler if policy engine is available 180 182 if (policyEngine) { 181 - const challengeTransport = new HttpChallengeTransport(); 183 + const libp2pNode = ipfsService?.getLibp2p(); 184 + const challengeTransport = libp2pNode 185 + ? new Libp2pChallengeTransport(libp2pNode as Libp2p) 186 + : new HttpChallengeTransport(); 182 187 replicationManager.startChallengeScheduler(challengeTransport); 183 - console.log(pc.dim(` Challenges: scheduler started`)); 188 + console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p" : "HTTP"} transport)`)); 184 189 } 185 190 } catch (err) { 186 191 console.error(pc.red(` Replication startup failed:`), err);