atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Wire endpoint-to-multiaddr resolution for challenge transport failover

Peer discovery now persists multiaddrs alongside peerId in the
replication_state table. The failover challenge transport's
resolveEndpoint hook queries SyncStorage to map PDS HTTP endpoints
to libp2p multiaddrs, enabling direct P2P challenges before falling
back to HTTP.

- Add peer_multiaddrs column to replication_state (with migration)
- Update updatePeerInfo/clearPeerInfo to handle multiaddrs
- Add getMultiaddrForPdsEndpoint() lookup (prefers /p2p/ addrs)
- Pass multiaddrs through PeerDiscovery → ReplicationManager → storage
- Wire resolveEndpoint closure in server.ts challenge transport setup

+129 -7
+2 -2
src/replication/replication-manager.ts
··· 429 429 this.syncStorage.upsertState({ did, pdsEndpoint }); 430 430 state = this.syncStorage.getState(did); 431 431 432 - // 2. Optionally refresh peer info 432 + // 2. Optionally refresh peer info (peerId + multiaddrs) 433 433 if (state && this.shouldRefreshPeerInfo(state)) { 434 434 try { 435 435 const peerInfo = await this.peerDiscovery.discoverPeer(did); 436 436 if (peerInfo) { 437 - this.syncStorage.updatePeerInfo(did, peerInfo.peerId); 437 + this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); 438 438 } 439 439 } catch { 440 440 // Non-fatal: peer discovery is optional
+75
src/replication/replication.test.ts
··· 171 171 expect(state.peerInfoFetchedAt).toBeNull(); 172 172 }); 173 173 174 + it("stores and retrieves peer multiaddrs", () => { 175 + storage.upsertState({ 176 + did: "did:plc:test1", 177 + pdsEndpoint: "https://pds.example.com", 178 + }); 179 + 180 + const multiaddrs = [ 181 + "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest", 182 + "/ip4/192.168.1.1/tcp/4001/p2p/12D3KooWTest", 183 + ]; 184 + storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", multiaddrs); 185 + 186 + const state = storage.getState("did:plc:test1")!; 187 + expect(state.peerId).toBe("12D3KooWTest"); 188 + expect(state.peerMultiaddrs).toEqual(multiaddrs); 189 + }); 190 + 191 + it("clearPeerInfo also clears multiaddrs", () => { 192 + storage.upsertState({ 193 + did: "did:plc:test1", 194 + pdsEndpoint: "https://pds.example.com", 195 + }); 196 + 197 + storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", ["/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"]); 198 + storage.clearPeerInfo("did:plc:test1"); 199 + 200 + const state = storage.getState("did:plc:test1")!; 201 + expect(state.peerId).toBeNull(); 202 + expect(state.peerMultiaddrs).toEqual([]); 203 + }); 204 + 205 + it("getMultiaddrForPdsEndpoint returns multiaddr with /p2p/", () => { 206 + storage.upsertState({ 207 + did: "did:plc:test1", 208 + pdsEndpoint: "https://pds.example.com", 209 + }); 210 + 211 + storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [ 212 + "/ip4/127.0.0.1/tcp/4001", 213 + "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest", 214 + ]); 215 + 216 + const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); 217 + expect(ma).toBe("/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"); 218 + }); 219 + 220 + it("getMultiaddrForPdsEndpoint returns null for unknown endpoint", () => { 221 + const ma = storage.getMultiaddrForPdsEndpoint("https://unknown.example.com"); 222 + expect(ma).toBeNull(); 223 + }); 224 + 225 + it("getMultiaddrForPdsEndpoint returns null when no multiaddrs stored", () => { 226 + storage.upsertState({ 227 + did: "did:plc:test1", 228 + pdsEndpoint: "https://pds.example.com", 229 + }); 230 + 231 + const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); 232 + expect(ma).toBeNull(); 233 + }); 234 + 235 + it("getMultiaddrForPdsEndpoint falls back to first addr when none have /p2p/", () => { 236 + storage.upsertState({ 237 + did: "did:plc:test1", 238 + pdsEndpoint: "https://pds.example.com", 239 + }); 240 + 241 + storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [ 242 + "/ip4/127.0.0.1/tcp/4001", 243 + ]); 244 + 245 + const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); 246 + expect(ma).toBe("/ip4/127.0.0.1/tcp/4001"); 247 + }); 248 + 174 249 it("getAllStates returns all entries sorted by DID", () => { 175 250 storage.upsertState({ 176 251 did: "did:plc:bbb",
+49 -5
src/replication/sync-storage.ts
··· 75 75 ); 76 76 `); 77 77 78 - // Migration: add root_cid column if missing (for existing databases) 78 + // Migrations: add columns if missing (for existing databases) 79 79 const columns = this.db 80 80 .prepare("PRAGMA table_info(replication_state)") 81 81 .all() as Array<{ name: string }>; 82 82 if (!columns.some((c) => c.name === "root_cid")) { 83 83 this.db.exec( 84 84 "ALTER TABLE replication_state ADD COLUMN root_cid TEXT", 85 + ); 86 + } 87 + if (!columns.some((c) => c.name === "peer_multiaddrs")) { 88 + this.db.exec( 89 + "ALTER TABLE replication_state ADD COLUMN peer_multiaddrs TEXT", 85 90 ); 86 91 } 87 92 } ··· 168 173 /** 169 174 * Update cached peer info for a DID. 170 175 */ 171 - updatePeerInfo(did: string, peerId: string | null): void { 176 + updatePeerInfo(did: string, peerId: string | null, multiaddrs?: string[]): void { 172 177 this.db 173 178 .prepare( 174 179 `UPDATE replication_state 175 - SET peer_id = ?, peer_info_fetched_at = datetime('now') 180 + SET peer_id = ?, peer_multiaddrs = ?, peer_info_fetched_at = datetime('now') 176 181 WHERE did = ?`, 177 182 ) 178 - .run(peerId, did); 183 + .run(peerId, multiaddrs && multiaddrs.length > 0 ? JSON.stringify(multiaddrs) : null, did); 179 184 } 180 185 181 186 /** ··· 185 190 this.db 186 191 .prepare( 187 192 `UPDATE replication_state 188 - SET peer_id = NULL, peer_info_fetched_at = NULL 193 + SET peer_id = NULL, peer_multiaddrs = NULL, peer_info_fetched_at = NULL 189 194 WHERE did = ?`, 190 195 ) 191 196 .run(did); ··· 456 461 .run(targetDid); 457 462 } 458 463 464 + /** 465 + * Look up the first available multiaddr for a PDS endpoint. 466 + * Searches replication_state rows matching the given PDS endpoint, 467 + * returning the first multiaddr that contains a /p2p/ component (peer ID). 468 + */ 469 + getMultiaddrForPdsEndpoint(pdsEndpoint: string): string | null { 470 + const rows = this.db 471 + .prepare( 472 + `SELECT peer_multiaddrs FROM replication_state 473 + WHERE pds_endpoint = ? AND peer_multiaddrs IS NOT NULL 474 + LIMIT 5`, 475 + ) 476 + .all(pdsEndpoint) as Array<{ peer_multiaddrs: string }>; 477 + 478 + for (const row of rows) { 479 + try { 480 + const addrs = JSON.parse(row.peer_multiaddrs) as string[]; 481 + // Prefer multiaddrs that include /p2p/ (have peer ID) 482 + const withPeerId = addrs.find((a) => a.includes("/p2p/")); 483 + if (withPeerId) return withPeerId; 484 + // Fall back to first addr if none have /p2p/ 485 + if (addrs.length > 0) return addrs[0]!; 486 + } catch { 487 + // Malformed JSON, skip 488 + } 489 + } 490 + return null; 491 + } 492 + 459 493 private rowToState(row: Record<string, unknown>): SyncState { 494 + let peerMultiaddrs: string[] = []; 495 + if (typeof row.peer_multiaddrs === "string") { 496 + try { 497 + peerMultiaddrs = JSON.parse(row.peer_multiaddrs) as string[]; 498 + } catch { 499 + // Malformed JSON, default to empty 500 + } 501 + } 502 + 460 503 return { 461 504 did: row.did as string, 462 505 pdsEndpoint: row.pds_endpoint as string, 463 506 peerId: (row.peer_id as string) ?? null, 507 + peerMultiaddrs, 464 508 peerInfoFetchedAt: (row.peer_info_fetched_at as string) ?? null, 465 509 lastSyncRev: (row.last_sync_rev as string) ?? null, 466 510 rootCid: (row.root_cid as string) ?? null,
+1
src/replication/types.ts
··· 40 40 did: string; 41 41 pdsEndpoint: string; 42 42 peerId: string | null; 43 + peerMultiaddrs: string[]; 43 44 peerInfoFetchedAt: string | null; 44 45 lastSyncRev: string | null; 45 46 rootCid: string | null;
+2
src/server.ts
··· 187 187 if (libp2pNode) { 188 188 const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 189 189 const httpTransport = new HttpChallengeTransport(); 190 + const syncStorage = replicationManager.getSyncStorage(); 190 191 challengeTransport = new FailoverChallengeTransport( 191 192 libp2pTransport, 192 193 httpTransport, 193 194 { 195 + resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 194 196 onFallback: (endpoint, error) => { 195 197 console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 196 198 },