atproto user agency toolkit for individuals and groups
1/**
2 * libp2p stream transport for the challenge-response protocol.
3 *
4 * Uses a custom libp2p protocol to exchange challenges over direct P2P
5 * connections, enabling proof-of-storage verification without requiring
6 * peers to have public HTTP endpoints.
7 *
8 * Wire format (half-close request-response):
9 * 1. Requester: send challenge JSON bytes, then close() (half-close write)
10 * 2. Responder: read all bytes, process, send response JSON bytes, then close()
11 * 3. Requester: read all response bytes
12 */
13
14import type { Libp2p, Stream } from "@libp2p/interface";
15import { multiaddr } from "@multiformats/multiaddr";
16import type { StorageChallenge, StorageChallengeResponse } from "./types.js";
17import { MAX_CHALLENGE_SIZE } from "./types.js";
18import type { ChallengeTransport } from "./transport.js";
19import { serializeResponse, deserializeResponse } from "./http-transport.js";
20
21/** Maximum response size (1 MB) — responses can include proof data. */
22const MAX_RESPONSE_SIZE = 1024 * 1024;
23
24export const CHALLENGE_PROTOCOL = "/p2pds/challenge/1.0.0";
25
26/**
27 * Collect all chunks from a libp2p stream into a single Uint8Array.
28 * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray().
29 * Throws if accumulated bytes exceed maxSize (abuse prevention).
30 */
31async function collectStream(
32 stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>,
33 maxSize: number = MAX_CHALLENGE_SIZE,
34): Promise<Uint8Array> {
35 const chunks: Uint8Array[] = [];
36 let totalSize = 0;
37 for await (const chunk of stream) {
38 const bytes = chunk instanceof Uint8Array ? chunk : chunk.subarray();
39 totalSize += bytes.length;
40 if (totalSize > maxSize) {
41 throw new Error(`Stream exceeded maximum size of ${maxSize} bytes`);
42 }
43 chunks.push(bytes);
44 }
45 if (chunks.length === 0) return new Uint8Array(0);
46 if (chunks.length === 1) return chunks[0]!;
47 const result = new Uint8Array(totalSize);
48 let offset = 0;
49 for (const c of chunks) {
50 result.set(c, offset);
51 offset += c.length;
52 }
53 return result;
54}
55
56export class Libp2pChallengeTransport implements ChallengeTransport {
57 private handler:
58 | ((challenge: StorageChallenge) => Promise<StorageChallengeResponse>)
59 | null = null;
60
61 constructor(private libp2p: Libp2p) {}
62
63 async sendChallenge(
64 targetEndpoint: string,
65 challenge: StorageChallenge,
66 ): Promise<StorageChallengeResponse> {
67 const ma = multiaddr(targetEndpoint);
68 const stream = await this.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL);
69
70 try {
71 // Send challenge JSON and half-close write end
72 const challengeBytes = new TextEncoder().encode(
73 JSON.stringify(challenge),
74 );
75 stream.send(challengeBytes);
76 await stream.close(); // flush + close write; stream remains readable
77
78 // Read response (allow up to 1 MB for proof data)
79 const responseBytes = await collectStream(
80 stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>,
81 MAX_RESPONSE_SIZE,
82 );
83 const raw = JSON.parse(new TextDecoder().decode(responseBytes));
84 return deserializeResponse(raw);
85 } catch (err) {
86 stream.abort(
87 err instanceof Error ? err : new Error(String(err)),
88 );
89 throw err;
90 }
91 }
92
93 onChallenge(
94 handler: (
95 challenge: StorageChallenge,
96 ) => Promise<StorageChallengeResponse>,
97 ): void {
98 this.handler = handler;
99
100 this.libp2p.handle(CHALLENGE_PROTOCOL, async (stream) => {
101 try {
102 // Read challenge bytes
103 const challengeBytes = await collectStream(
104 stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>,
105 );
106 const challenge = JSON.parse(
107 new TextDecoder().decode(challengeBytes),
108 ) as StorageChallenge;
109
110 // Process and send response
111 const response = await handler(challenge);
112 const serialized = serializeResponse(response);
113 const responseBytes = new TextEncoder().encode(
114 JSON.stringify(serialized),
115 );
116 stream.send(responseBytes);
117 await stream.close();
118 } catch (err) {
119 stream.abort(
120 err instanceof Error ? err : new Error(String(err)),
121 );
122 }
123 });
124 }
125
126 async stop(): Promise<void> {
127 await this.libp2p.unhandle(CHALLENGE_PROTOCOL);
128 }
129}