atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 129 lines 4.1 kB view raw
1/** 2 * libp2p stream transport for the challenge-response protocol. 3 * 4 * Uses a custom libp2p protocol to exchange challenges over direct P2P 5 * connections, enabling proof-of-storage verification without requiring 6 * peers to have public HTTP endpoints. 7 * 8 * Wire format (half-close request-response): 9 * 1. Requester: send challenge JSON bytes, then close() (half-close write) 10 * 2. Responder: read all bytes, process, send response JSON bytes, then close() 11 * 3. Requester: read all response bytes 12 */ 13 14import type { Libp2p, Stream } from "@libp2p/interface"; 15import { multiaddr } from "@multiformats/multiaddr"; 16import type { StorageChallenge, StorageChallengeResponse } from "./types.js"; 17import { MAX_CHALLENGE_SIZE } from "./types.js"; 18import type { ChallengeTransport } from "./transport.js"; 19import { serializeResponse, deserializeResponse } from "./http-transport.js"; 20 21/** Maximum response size (1 MB) — responses can include proof data. */ 22const MAX_RESPONSE_SIZE = 1024 * 1024; 23 24export const CHALLENGE_PROTOCOL = "/p2pds/challenge/1.0.0"; 25 26/** 27 * Collect all chunks from a libp2p stream into a single Uint8Array. 28 * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray(). 29 * Throws if accumulated bytes exceed maxSize (abuse prevention). 30 */ 31async function collectStream( 32 stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 33 maxSize: number = MAX_CHALLENGE_SIZE, 34): Promise<Uint8Array> { 35 const chunks: Uint8Array[] = []; 36 let totalSize = 0; 37 for await (const chunk of stream) { 38 const bytes = chunk instanceof Uint8Array ? chunk : chunk.subarray(); 39 totalSize += bytes.length; 40 if (totalSize > maxSize) { 41 throw new Error(`Stream exceeded maximum size of ${maxSize} bytes`); 42 } 43 chunks.push(bytes); 44 } 45 if (chunks.length === 0) return new Uint8Array(0); 46 if (chunks.length === 1) return chunks[0]!; 47 const result = new Uint8Array(totalSize); 48 let offset = 0; 49 for (const c of chunks) { 50 result.set(c, offset); 51 offset += c.length; 52 } 53 return result; 54} 55 56export class Libp2pChallengeTransport implements ChallengeTransport { 57 private handler: 58 | ((challenge: StorageChallenge) => Promise<StorageChallengeResponse>) 59 | null = null; 60 61 constructor(private libp2p: Libp2p) {} 62 63 async sendChallenge( 64 targetEndpoint: string, 65 challenge: StorageChallenge, 66 ): Promise<StorageChallengeResponse> { 67 const ma = multiaddr(targetEndpoint); 68 const stream = await this.libp2p.dialProtocol(ma, CHALLENGE_PROTOCOL); 69 70 try { 71 // Send challenge JSON and half-close write end 72 const challengeBytes = new TextEncoder().encode( 73 JSON.stringify(challenge), 74 ); 75 stream.send(challengeBytes); 76 await stream.close(); // flush + close write; stream remains readable 77 78 // Read response (allow up to 1 MB for proof data) 79 const responseBytes = await collectStream( 80 stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 81 MAX_RESPONSE_SIZE, 82 ); 83 const raw = JSON.parse(new TextDecoder().decode(responseBytes)); 84 return deserializeResponse(raw); 85 } catch (err) { 86 stream.abort( 87 err instanceof Error ? err : new Error(String(err)), 88 ); 89 throw err; 90 } 91 } 92 93 onChallenge( 94 handler: ( 95 challenge: StorageChallenge, 96 ) => Promise<StorageChallengeResponse>, 97 ): void { 98 this.handler = handler; 99 100 this.libp2p.handle(CHALLENGE_PROTOCOL, async (stream) => { 101 try { 102 // Read challenge bytes 103 const challengeBytes = await collectStream( 104 stream as unknown as AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 105 ); 106 const challenge = JSON.parse( 107 new TextDecoder().decode(challengeBytes), 108 ) as StorageChallenge; 109 110 // Process and send response 111 const response = await handler(challenge); 112 const serialized = serializeResponse(response); 113 const responseBytes = new TextEncoder().encode( 114 JSON.stringify(serialized), 115 ); 116 stream.send(responseBytes); 117 await stream.close(); 118 } catch (err) { 119 stream.abort( 120 err instanceof Error ? err : new Error(String(err)), 121 ); 122 } 123 }); 124 } 125 126 async stop(): Promise<void> { 127 await this.libp2p.unhandle(CHALLENGE_PROTOCOL); 128 } 129}