atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add failover challenge transport: try libp2p first, fall back to HTTP

FailoverChallengeTransport wraps a primary and fallback ChallengeTransport.
On sendChallenge, tries the primary; on failure, invokes onFallback callback
and retries via fallback. Optional resolveEndpoint hook maps HTTP URLs to
multiaddrs for the libp2p transport. server.ts now uses failover when
libp2p is available.

+264 -4
+180
src/replication/challenge-response/failover-transport.test.ts
··· 1 + import { describe, it, expect, vi } from "vitest"; 2 + import { FailoverChallengeTransport } from "./failover-transport.js"; 3 + import type { ChallengeTransport } from "./transport.js"; 4 + import type { 5 + StorageChallenge, 6 + StorageChallengeResponse, 7 + } from "./types.js"; 8 + 9 + // ============================================ 10 + // Mock transport 11 + // ============================================ 12 + 13 + function makeMockTransport(opts?: { 14 + shouldFail?: boolean; 15 + failError?: Error; 16 + }): ChallengeTransport & { 17 + sendCalls: Array<{ endpoint: string; challenge: StorageChallenge }>; 18 + onChallengeCalls: number; 19 + stopCalled: boolean; 20 + stop: () => Promise<void>; 21 + } { 22 + const mock = { 23 + sendCalls: [] as Array<{ endpoint: string; challenge: StorageChallenge }>, 24 + onChallengeCalls: 0, 25 + stopCalled: false, 26 + 27 + async sendChallenge( 28 + endpoint: string, 29 + challenge: StorageChallenge, 30 + ): Promise<StorageChallengeResponse> { 31 + mock.sendCalls.push({ endpoint, challenge }); 32 + if (opts?.shouldFail) { 33 + throw opts.failError ?? new Error("transport failed"); 34 + } 35 + return { 36 + challengeId: challenge.id, 37 + responderDid: challenge.targetDid, 38 + respondedAt: new Date().toISOString(), 39 + }; 40 + }, 41 + 42 + onChallenge( 43 + _handler: (challenge: StorageChallenge) => Promise<StorageChallengeResponse>, 44 + ): void { 45 + mock.onChallengeCalls++; 46 + }, 47 + 48 + async stop(): Promise<void> { 49 + mock.stopCalled = true; 50 + }, 51 + }; 52 + return mock; 53 + } 54 + 55 + function makeChallenge(id = "test-challenge"): StorageChallenge { 56 + return { 57 + id, 58 + version: 1, 59 + challengerDid: "did:plc:challenger", 60 + targetDid: "did:plc:target", 61 + subjectDid: "did:plc:subject", 62 + commitCid: "bafycommit", 63 + challengeType: "mst-proof", 64 + recordPaths: ["app.bsky.feed.post/abc"], 65 + issuedAt: new Date().toISOString(), 66 + expiresAt: new Date(Date.now() + 60000).toISOString(), 67 + epoch: 1, 68 + nonce: "test-nonce", 69 + }; 70 + } 71 + 72 + // ============================================ 73 + // Tests 74 + // ============================================ 75 + 76 + describe("FailoverChallengeTransport", () => { 77 + it("uses primary when it succeeds", async () => { 78 + const primary = makeMockTransport(); 79 + const fallback = makeMockTransport(); 80 + const transport = new FailoverChallengeTransport(primary, fallback); 81 + 82 + const challenge = makeChallenge(); 83 + const response = await transport.sendChallenge("https://target.example", challenge); 84 + 85 + expect(response.challengeId).toBe("test-challenge"); 86 + expect(primary.sendCalls).toHaveLength(1); 87 + expect(fallback.sendCalls).toHaveLength(0); 88 + }); 89 + 90 + it("falls back when primary fails", async () => { 91 + const primary = makeMockTransport({ shouldFail: true }); 92 + const fallback = makeMockTransport(); 93 + const onFallback = vi.fn(); 94 + const transport = new FailoverChallengeTransport(primary, fallback, { onFallback }); 95 + 96 + const challenge = makeChallenge(); 97 + const response = await transport.sendChallenge("https://target.example", challenge); 98 + 99 + expect(response.challengeId).toBe("test-challenge"); 100 + expect(primary.sendCalls).toHaveLength(1); 101 + expect(fallback.sendCalls).toHaveLength(1); 102 + expect(onFallback).toHaveBeenCalledOnce(); 103 + expect(onFallback.mock.calls[0]![0]).toBe("https://target.example"); 104 + expect(onFallback.mock.calls[0]![1]).toBeInstanceOf(Error); 105 + }); 106 + 107 + it("throws when both fail", async () => { 108 + const primary = makeMockTransport({ 109 + shouldFail: true, 110 + failError: new Error("libp2p down"), 111 + }); 112 + const fallback = makeMockTransport({ 113 + shouldFail: true, 114 + failError: new Error("http down"), 115 + }); 116 + const transport = new FailoverChallengeTransport(primary, fallback); 117 + 118 + await expect( 119 + transport.sendChallenge("https://target.example", makeChallenge()), 120 + ).rejects.toThrow("http down"); 121 + 122 + expect(primary.sendCalls).toHaveLength(1); 123 + expect(fallback.sendCalls).toHaveLength(1); 124 + }); 125 + 126 + it("skips primary when resolveEndpoint returns null", async () => { 127 + const primary = makeMockTransport(); 128 + const fallback = makeMockTransport(); 129 + const transport = new FailoverChallengeTransport(primary, fallback, { 130 + resolveEndpoint: () => null, 131 + }); 132 + 133 + const response = await transport.sendChallenge("https://target.example", makeChallenge()); 134 + 135 + expect(response.challengeId).toBe("test-challenge"); 136 + expect(primary.sendCalls).toHaveLength(0); 137 + expect(fallback.sendCalls).toHaveLength(1); 138 + }); 139 + 140 + it("passes resolved endpoint to primary", async () => { 141 + const primary = makeMockTransport(); 142 + const fallback = makeMockTransport(); 143 + const transport = new FailoverChallengeTransport(primary, fallback, { 144 + resolveEndpoint: (http) => 145 + http === "https://target.example" ? "/ip4/127.0.0.1/tcp/4001/p2p/QmPeer" : null, 146 + }); 147 + 148 + await transport.sendChallenge("https://target.example", makeChallenge()); 149 + 150 + expect(primary.sendCalls).toHaveLength(1); 151 + expect(primary.sendCalls[0]!.endpoint).toBe("/ip4/127.0.0.1/tcp/4001/p2p/QmPeer"); 152 + expect(fallback.sendCalls).toHaveLength(0); 153 + }); 154 + 155 + it("registers onChallenge handler on both transports", () => { 156 + const primary = makeMockTransport(); 157 + const fallback = makeMockTransport(); 158 + const transport = new FailoverChallengeTransport(primary, fallback); 159 + 160 + transport.onChallenge(async () => ({ 161 + challengeId: "x", 162 + responderDid: "did:plc:me", 163 + respondedAt: new Date().toISOString(), 164 + })); 165 + 166 + expect(primary.onChallengeCalls).toBe(1); 167 + expect(fallback.onChallengeCalls).toBe(1); 168 + }); 169 + 170 + it("stop() delegates to both transports", async () => { 171 + const primary = makeMockTransport(); 172 + const fallback = makeMockTransport(); 173 + const transport = new FailoverChallengeTransport(primary, fallback); 174 + 175 + await transport.stop(); 176 + 177 + expect(primary.stopCalled).toBe(true); 178 + expect(fallback.stopCalled).toBe(true); 179 + }); 180 + });
+65
src/replication/challenge-response/failover-transport.ts
··· 1 + /** 2 + * Failover challenge transport: tries a primary transport (e.g. libp2p), 3 + * falls back to a secondary (e.g. HTTP) on failure. 4 + * 5 + * The optional resolveEndpoint function maps the scheduler's targetEndpoint 6 + * (an HTTP URL like "https://alice.pds.example") to the primary transport's 7 + * format (e.g. a libp2p multiaddr). If it returns null, the primary is 8 + * skipped and the fallback is used directly. 9 + */ 10 + 11 + import type { StorageChallenge, StorageChallengeResponse } from "./types.js"; 12 + import type { ChallengeTransport } from "./transport.js"; 13 + 14 + export interface FailoverOptions { 15 + /** Map the scheduler's HTTP endpoint to the primary transport's endpoint format. */ 16 + resolveEndpoint?: (httpEndpoint: string) => string | null; 17 + /** Called when the primary transport fails and we fall back. */ 18 + onFallback?: (targetEndpoint: string, error: Error) => void; 19 + } 20 + 21 + export class FailoverChallengeTransport implements ChallengeTransport { 22 + constructor( 23 + private primary: ChallengeTransport, 24 + private fallback: ChallengeTransport, 25 + private options?: FailoverOptions, 26 + ) {} 27 + 28 + async sendChallenge( 29 + targetEndpoint: string, 30 + challenge: StorageChallenge, 31 + ): Promise<StorageChallengeResponse> { 32 + // Resolve the endpoint for the primary transport 33 + const primaryEndpoint = this.options?.resolveEndpoint 34 + ? this.options.resolveEndpoint(targetEndpoint) 35 + : targetEndpoint; 36 + 37 + // If resolver returned null, skip primary entirely 38 + if (primaryEndpoint !== null) { 39 + try { 40 + return await this.primary.sendChallenge(primaryEndpoint, challenge); 41 + } catch (err) { 42 + const error = err instanceof Error ? err : new Error(String(err)); 43 + this.options?.onFallback?.(targetEndpoint, error); 44 + } 45 + } 46 + 47 + return await this.fallback.sendChallenge(targetEndpoint, challenge); 48 + } 49 + 50 + onChallenge( 51 + handler: (challenge: StorageChallenge) => Promise<StorageChallengeResponse>, 52 + ): void { 53 + this.primary.onChallenge(handler); 54 + this.fallback.onChallenge(handler); 55 + } 56 + 57 + async stop(): Promise<void> { 58 + if ("stop" in this.primary && typeof (this.primary as any).stop === "function") { 59 + await (this.primary as any).stop(); 60 + } 61 + if ("stop" in this.fallback && typeof (this.fallback as any).stop === "function") { 62 + await (this.fallback as any).stop(); 63 + } 64 + } 65 + }
+19 -4
src/server.ts
··· 19 19 import { PolicyEngine } from "./policy/engine.js"; 20 20 import { HttpChallengeTransport } from "./replication/challenge-response/http-transport.js"; 21 21 import { Libp2pChallengeTransport } from "./replication/challenge-response/libp2p-transport.js"; 22 + import { FailoverChallengeTransport } from "./replication/challenge-response/failover-transport.js"; 23 + import type { ChallengeTransport } from "./replication/challenge-response/transport.js"; 22 24 import type { Libp2p } from "@libp2p/interface"; 23 25 24 26 // Load configuration ··· 181 183 // Start challenge scheduler if policy engine is available 182 184 if (policyEngine) { 183 185 const libp2pNode = ipfsService?.getLibp2p(); 184 - const challengeTransport = libp2pNode 185 - ? new Libp2pChallengeTransport(libp2pNode as Libp2p) 186 - : new HttpChallengeTransport(); 186 + let challengeTransport: ChallengeTransport; 187 + if (libp2pNode) { 188 + const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 189 + const httpTransport = new HttpChallengeTransport(); 190 + challengeTransport = new FailoverChallengeTransport( 191 + libp2pTransport, 192 + httpTransport, 193 + { 194 + onFallback: (endpoint, error) => { 195 + console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 196 + }, 197 + }, 198 + ); 199 + } else { 200 + challengeTransport = new HttpChallengeTransport(); 201 + } 187 202 replicationManager.startChallengeScheduler(challengeTransport); 188 - console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p" : "HTTP"} transport)`)); 203 + console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`)); 189 204 } 190 205 } catch (err) { 191 206 console.error(pc.red(` Replication startup failed:`), err);