atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add peer multiaddr freshness and DID-to-PeerID staleness detection

Detect and log PeerID changes during sync, trigger immediate re-discovery
on connection failure, republish identity when multiaddrs change, cache
observed libp2p addrs, and broadcast identity changes via gossipsub
(/p2pds/identity/1/{did} topics).

+966 -20
+138 -16
src/ipfs.ts
··· 37 37 } 38 38 39 39 /** 40 + * Lightweight gossipsub notification for a peer identity change. 41 + * Published when a peer's multiaddrs or PeerID changes. 42 + */ 43 + export interface IdentityNotification { 44 + did: string; 45 + peerId: string; 46 + multiaddrs: string[]; 47 + time: string; 48 + } 49 + 50 + export type IdentityNotificationHandler = (notification: IdentityNotification) => void | Promise<void>; 51 + 52 + export const IDENTITY_TOPIC_PREFIX = "/p2pds/identity/1/"; 53 + 54 + export function identityTopic(did: string): string { 55 + return `${IDENTITY_TOPIC_PREFIX}${did}`; 56 + } 57 + 58 + /** 40 59 * P2P networking: content routing, peer identity, connectivity, gossipsub. 41 60 * Separated from storage so transports can be swapped independently. 42 61 */ ··· 53 72 getPeerId(): string | null; 54 73 getMultiaddrs(): string[]; 55 74 getConnectionCount(): number; 75 + getRemoteAddrs(peerId: string): string[]; 76 + publishIdentityNotification(did: string, peerId: string, multiaddrs: string[]): Promise<void>; 77 + onIdentityNotification(handler: IdentityNotificationHandler): void; 78 + subscribeIdentityTopics(dids: string[]): void; 79 + unsubscribeIdentityTopics(dids: string[]): void; 56 80 } 57 81 58 82 export interface IpfsConfig { ··· 77 101 private config: IpfsConfig; 78 102 private running = false; 79 103 private commitHandlers: CommitNotificationHandler[] = []; 104 + private identityHandlers: IdentityNotificationHandler[] = []; 80 105 private subscribedTopics: Set<string> = new Set(); 81 106 82 107 constructor(config: IpfsConfig) { ··· 273 298 } 274 299 275 300 /** 301 + * Publish an identity notification via gossipsub. 302 + * CBOR-encodes { did, peerId, multiaddrs, time } and publishes to the DID's identity topic. 303 + */ 304 + async publishIdentityNotification( 305 + did: string, 306 + peerId: string, 307 + multiaddrs: string[], 308 + ): Promise<void> { 309 + if (!this.helia) return; 310 + const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 311 + { publish(topic: string, data: Uint8Array): Promise<unknown> } | undefined; 312 + if (!pubsub) return; 313 + 314 + const notification: IdentityNotification = { 315 + did, 316 + peerId, 317 + multiaddrs, 318 + time: new Date().toISOString(), 319 + }; 320 + 321 + const data = cborEncode(notification); 322 + await pubsub.publish(identityTopic(did), data); 323 + } 324 + 325 + /** 326 + * Register a handler for incoming identity notifications. 327 + */ 328 + onIdentityNotification(handler: IdentityNotificationHandler): void { 329 + this.identityHandlers.push(handler); 330 + } 331 + 332 + /** 333 + * Subscribe to identity gossipsub topics for the given DIDs. 334 + */ 335 + subscribeIdentityTopics(dids: string[]): void { 336 + if (!this.helia) return; 337 + const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 338 + { subscribe(topic: string): void } | undefined; 339 + if (!pubsub) return; 340 + 341 + for (const did of dids) { 342 + const topic = identityTopic(did); 343 + if (!this.subscribedTopics.has(topic)) { 344 + pubsub.subscribe(topic); 345 + this.subscribedTopics.add(topic); 346 + } 347 + } 348 + } 349 + 350 + /** 351 + * Unsubscribe from identity gossipsub topics for the given DIDs. 352 + */ 353 + unsubscribeIdentityTopics(dids: string[]): void { 354 + if (!this.helia) return; 355 + const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 356 + { unsubscribe(topic: string): void } | undefined; 357 + if (!pubsub) return; 358 + 359 + for (const did of dids) { 360 + const topic = identityTopic(did); 361 + if (this.subscribedTopics.has(topic)) { 362 + pubsub.unsubscribe(topic); 363 + this.subscribedTopics.delete(topic); 364 + } 365 + } 366 + } 367 + 368 + /** 276 369 * Set up the gossipsub message handler. 277 370 * Listens for "message" events, CBOR-decodes, and dispatches to all registered handlers. 278 371 */ ··· 285 378 pubsub.addEventListener("message", (evt: unknown) => { 286 379 try { 287 380 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; 288 - if (!detail.topic.startsWith(COMMIT_TOPIC_PREFIX)) return; 381 + 382 + if (detail.topic.startsWith(COMMIT_TOPIC_PREFIX)) { 383 + const notification = cborDecode(detail.data) as CommitNotification; 384 + if ( 385 + typeof notification.did !== "string" || 386 + typeof notification.commit !== "string" || 387 + typeof notification.rev !== "string" 388 + ) { 389 + return; 390 + } 289 391 290 - const notification = cborDecode(detail.data) as CommitNotification; 291 - if ( 292 - typeof notification.did !== "string" || 293 - typeof notification.commit !== "string" || 294 - typeof notification.rev !== "string" 295 - ) { 296 - return; 297 - } 392 + for (const handler of this.commitHandlers) { 393 + try { 394 + const result = handler(notification); 395 + if (result && typeof (result as Promise<void>).catch === "function") { 396 + (result as Promise<void>).catch(() => {}); 397 + } 398 + } catch { 399 + // Individual handler errors don't affect other handlers 400 + } 401 + } 402 + } else if (detail.topic.startsWith(IDENTITY_TOPIC_PREFIX)) { 403 + const notification = cborDecode(detail.data) as IdentityNotification; 404 + if ( 405 + typeof notification.did !== "string" || 406 + typeof notification.peerId !== "string" || 407 + !Array.isArray(notification.multiaddrs) 408 + ) { 409 + return; 410 + } 298 411 299 - for (const handler of this.commitHandlers) { 300 - try { 301 - const result = handler(notification); 302 - if (result && typeof (result as Promise<void>).catch === "function") { 303 - (result as Promise<void>).catch(() => {}); 412 + for (const handler of this.identityHandlers) { 413 + try { 414 + const result = handler(notification); 415 + if (result && typeof (result as Promise<void>).catch === "function") { 416 + (result as Promise<void>).catch(() => {}); 417 + } 418 + } catch { 419 + // Individual handler errors don't affect other handlers 304 420 } 305 - } catch { 306 - // Individual handler errors don't affect other handlers 307 421 } 308 422 } 309 423 } catch { ··· 325 439 getConnectionCount(): number { 326 440 if (!this.helia) return 0; 327 441 return this.helia.libp2p.getConnections().length; 442 + } 443 + 444 + getRemoteAddrs(peerId: string): string[] { 445 + if (!this.helia) return []; 446 + return this.helia.libp2p 447 + .getConnections() 448 + .filter((conn) => conn.remotePeer.toString() === peerId) 449 + .map((conn) => conn.remoteAddr.toString()); 328 450 } 329 451 330 452 isRunning(): boolean {
+5
src/replication/gossipsub-notifications.test.ts
··· 248 248 getPeerId: vi.fn().mockReturnValue("12D3KooWMockPeer"), 249 249 getMultiaddrs: vi.fn().mockReturnValue(["/ip4/127.0.0.1/tcp/4001"]), 250 250 getConnectionCount: vi.fn().mockReturnValue(0), 251 + getRemoteAddrs: vi.fn().mockReturnValue([]), 252 + publishIdentityNotification: vi.fn().mockResolvedValue(undefined), 253 + onIdentityNotification: vi.fn(), 254 + subscribeIdentityTopics: vi.fn(), 255 + unsubscribeIdentityTopics: vi.fn(), 251 256 // Test helpers 252 257 _handlers: handlers, 253 258 _subscribedDids: subscribedDids,
+725
src/replication/peer-freshness.test.ts
··· 1 + /** 2 + * Peer multiaddr freshness + DID-to-PeerID staleness detection tests. 3 + * 4 + * Tests: PeerID change detection, refreshPeerInfoForEndpoint, 5 + * republish on multiaddr change, getRemoteAddrs, identity notifications. 6 + */ 7 + 8 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 9 + import { mkdtempSync, rmSync } from "node:fs"; 10 + import { tmpdir } from "node:os"; 11 + import { join } from "node:path"; 12 + import Database from "better-sqlite3"; 13 + import { 14 + encode as cborEncode, 15 + decode as cborDecode, 16 + } from "../cbor-compat.js"; 17 + import { 18 + identityTopic, 19 + IDENTITY_TOPIC_PREFIX, 20 + type IdentityNotification, 21 + type CommitNotification, 22 + } from "../ipfs.js"; 23 + import type { Config } from "../config.js"; 24 + 25 + // ============================================ 26 + // Helpers 27 + // ============================================ 28 + 29 + function testConfig(dataDir: string, replicateDids: string[] = []): Config { 30 + return { 31 + DID: "did:plc:local", 32 + HANDLE: "test.example.com", 33 + PDS_HOSTNAME: "test.example.com", 34 + AUTH_TOKEN: "test", 35 + SIGNING_KEY: 36 + "0000000000000000000000000000000000000000000000000000000000000001", 37 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 38 + JWT_SECRET: "test", 39 + PASSWORD_HASH: "$2a$10$test", 40 + DATA_DIR: dataDir, 41 + PORT: 3000, 42 + IPFS_ENABLED: true, 43 + IPFS_NETWORKING: false, 44 + REPLICATE_DIDS: replicateDids, 45 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 46 + FIREHOSE_ENABLED: false, 47 + }; 48 + } 49 + 50 + type IdentityHandler = (n: IdentityNotification) => void | Promise<void>; 51 + 52 + function createMockNetworkService() { 53 + const commitHandlers: Array<(n: CommitNotification) => void | Promise<void>> = []; 54 + const identityHandlers: IdentityHandler[] = []; 55 + const subscribedCommitDids: string[] = []; 56 + const subscribedIdentityDids: string[] = []; 57 + const unsubscribedIdentityDids: string[] = []; 58 + const publishedIdentity: IdentityNotification[] = []; 59 + let multiaddrs = ["/ip4/127.0.0.1/tcp/4001"]; 60 + let remoteAddrsMap: Record<string, string[]> = {}; 61 + 62 + return { 63 + provideBlocks: vi.fn().mockResolvedValue(undefined), 64 + publishCommitNotification: vi.fn().mockResolvedValue(undefined), 65 + onCommitNotification: vi.fn().mockImplementation( 66 + (handler: (n: CommitNotification) => void | Promise<void>) => { 67 + commitHandlers.push(handler); 68 + }, 69 + ), 70 + subscribeCommitTopics: vi.fn().mockImplementation( 71 + (dids: string[]) => { subscribedCommitDids.push(...dids); }, 72 + ), 73 + unsubscribeCommitTopics: vi.fn(), 74 + publishIdentityNotification: vi.fn().mockImplementation( 75 + async (did: string, peerId: string, addrs: string[]) => { 76 + publishedIdentity.push({ did, peerId, multiaddrs: addrs, time: new Date().toISOString() }); 77 + }, 78 + ), 79 + onIdentityNotification: vi.fn().mockImplementation( 80 + (handler: IdentityHandler) => { identityHandlers.push(handler); }, 81 + ), 82 + subscribeIdentityTopics: vi.fn().mockImplementation( 83 + (dids: string[]) => { subscribedIdentityDids.push(...dids); }, 84 + ), 85 + unsubscribeIdentityTopics: vi.fn().mockImplementation( 86 + (dids: string[]) => { unsubscribedIdentityDids.push(...dids); }, 87 + ), 88 + getPeerId: vi.fn().mockReturnValue("12D3KooWMockPeer"), 89 + getMultiaddrs: vi.fn().mockImplementation(() => multiaddrs), 90 + getConnectionCount: vi.fn().mockReturnValue(0), 91 + getRemoteAddrs: vi.fn().mockImplementation((peerId: string) => remoteAddrsMap[peerId] ?? []), 92 + // Test helpers 93 + _commitHandlers: commitHandlers, 94 + _identityHandlers: identityHandlers, 95 + _subscribedCommitDids: subscribedCommitDids, 96 + _subscribedIdentityDids: subscribedIdentityDids, 97 + _unsubscribedIdentityDids: unsubscribedIdentityDids, 98 + _publishedIdentity: publishedIdentity, 99 + _setMultiaddrs(addrs: string[]) { multiaddrs = addrs; }, 100 + _setRemoteAddrs(peerId: string, addrs: string[]) { remoteAddrsMap[peerId] = addrs; }, 101 + _simulateIdentityNotification(notification: IdentityNotification) { 102 + for (const handler of identityHandlers) { 103 + handler(notification); 104 + } 105 + }, 106 + }; 107 + } 108 + 109 + function createMockBlockStore() { 110 + return { 111 + putBlock: vi.fn().mockResolvedValue(undefined), 112 + getBlock: vi.fn().mockResolvedValue(null), 113 + hasBlock: vi.fn().mockResolvedValue(false), 114 + putBlocks: vi.fn().mockResolvedValue(undefined), 115 + }; 116 + } 117 + 118 + // ============================================ 119 + // IdentityNotification encoding + topic tests 120 + // ============================================ 121 + 122 + describe("IdentityNotification encoding", () => { 123 + it("CBOR encode/decode round-trip", () => { 124 + const notification: IdentityNotification = { 125 + did: "did:plc:abc123", 126 + peerId: "12D3KooWTest", 127 + multiaddrs: ["/ip4/127.0.0.1/tcp/4001", "/ip4/192.168.1.1/tcp/4001"], 128 + time: "2024-01-01T00:00:00.000Z", 129 + }; 130 + 131 + const encoded = cborEncode(notification); 132 + expect(encoded).toBeInstanceOf(Uint8Array); 133 + expect(encoded.length).toBeGreaterThan(0); 134 + 135 + const decoded = cborDecode(encoded) as IdentityNotification; 136 + expect(decoded.did).toBe(notification.did); 137 + expect(decoded.peerId).toBe(notification.peerId); 138 + expect(decoded.multiaddrs).toEqual(notification.multiaddrs); 139 + expect(decoded.time).toBe(notification.time); 140 + }); 141 + 142 + it("identityTopic() produces correct topic strings", () => { 143 + const did = "did:plc:abc123"; 144 + const topic = identityTopic(did); 145 + expect(topic).toBe("/p2pds/identity/1/did:plc:abc123"); 146 + expect(topic.startsWith(IDENTITY_TOPIC_PREFIX)).toBe(true); 147 + }); 148 + 149 + it("different DIDs produce different identity topics", () => { 150 + const topic1 = identityTopic("did:plc:aaa"); 151 + const topic2 = identityTopic("did:plc:bbb"); 152 + expect(topic1).not.toBe(topic2); 153 + }); 154 + }); 155 + 156 + // ============================================ 157 + // PeerID change detection 158 + // ============================================ 159 + 160 + describe("PeerID change detection", () => { 161 + let tmpDir: string; 162 + 163 + beforeEach(() => { 164 + tmpDir = mkdtempSync(join(tmpdir(), "peer-freshness-test-")); 165 + }); 166 + 167 + afterEach(() => { 168 + rmSync(tmpDir, { recursive: true, force: true }); 169 + }); 170 + 171 + it("logs warning when PeerID changes during syncDid()", async () => { 172 + const db = new Database(join(tmpDir, "test.db")); 173 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 174 + 175 + const { RepoManager } = await import("../repo-manager.js"); 176 + const { ReplicationManager } = await import("./replication-manager.js"); 177 + const { DidResolver } = await import("../did-resolver.js"); 178 + 179 + const repoManager = new RepoManager(db, config); 180 + repoManager.init(); 181 + 182 + const mockNet = createMockNetworkService(); 183 + const mockBlocks = createMockBlockStore(); 184 + const didResolver = new DidResolver(); 185 + 186 + const manager = new ReplicationManager( 187 + db, config, repoManager, mockBlocks, mockNet, didResolver, 188 + ); 189 + 190 + try { 191 + await manager.init(); 192 + 193 + // Seed sync state with an existing peerId 194 + const storage = manager.getSyncStorage(); 195 + storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" }); 196 + storage.updatePeerInfo("did:plc:remote1", "12D3KooWOldPeer", ["/ip4/1.2.3.4/tcp/4001"]); 197 + // Set peerInfoFetchedAt to null so shouldRefreshPeerInfo returns true 198 + db.prepare("UPDATE replication_state SET peer_info_fetched_at = NULL WHERE did = ?") 199 + .run("did:plc:remote1"); 200 + 201 + // Mock peer discovery to return a new PeerID 202 + const { PeerDiscovery } = await import("./peer-discovery.js"); 203 + vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({ 204 + pdsEndpoint: "https://pds.example.com", 205 + peerId: "12D3KooWNewPeer", 206 + multiaddrs: ["/ip4/5.6.7.8/tcp/4001"], 207 + }); 208 + 209 + // Mock repo fetcher 210 + const { RepoFetcher } = await import("./repo-fetcher.js"); 211 + vi.spyOn(RepoFetcher.prototype, "resolvePds").mockResolvedValue("https://pds.example.com"); 212 + vi.spyOn(RepoFetcher.prototype, "fetchRepo").mockRejectedValue(new Error("test skip")); 213 + 214 + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); 215 + 216 + try { 217 + await manager.syncDid("did:plc:remote1"); 218 + } catch { 219 + // Expected to fail since fetchRepo is mocked to reject 220 + } 221 + 222 + // Check that PeerID change warning was logged 223 + const peerIdChangeWarning = warnSpy.mock.calls.find( 224 + (call) => typeof call[0] === "string" && call[0].includes("PeerID changed"), 225 + ); 226 + expect(peerIdChangeWarning).toBeDefined(); 227 + expect(peerIdChangeWarning![0]).toContain("12D3KooWOldPeer"); 228 + expect(peerIdChangeWarning![0]).toContain("12D3KooWNewPeer"); 229 + 230 + warnSpy.mockRestore(); 231 + vi.restoreAllMocks(); 232 + } finally { 233 + manager.stop(); 234 + db.close(); 235 + } 236 + }); 237 + }); 238 + 239 + // ============================================ 240 + // refreshPeerInfoForEndpoint 241 + // ============================================ 242 + 243 + describe("refreshPeerInfoForEndpoint", () => { 244 + let tmpDir: string; 245 + 246 + beforeEach(() => { 247 + tmpDir = mkdtempSync(join(tmpdir(), "peer-refresh-test-")); 248 + }); 249 + 250 + afterEach(() => { 251 + rmSync(tmpDir, { recursive: true, force: true }); 252 + }); 253 + 254 + it("clears stale peer info and triggers re-discovery for matching DIDs", async () => { 255 + const db = new Database(join(tmpDir, "test.db")); 256 + const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); 257 + 258 + const { RepoManager } = await import("../repo-manager.js"); 259 + const { ReplicationManager } = await import("./replication-manager.js"); 260 + const { DidResolver } = await import("../did-resolver.js"); 261 + const { PeerDiscovery } = await import("./peer-discovery.js"); 262 + 263 + const repoManager = new RepoManager(db, config); 264 + repoManager.init(); 265 + 266 + const mockNet = createMockNetworkService(); 267 + const mockBlocks = createMockBlockStore(); 268 + const didResolver = new DidResolver(); 269 + 270 + const manager = new ReplicationManager( 271 + db, config, repoManager, mockBlocks, mockNet, didResolver, 272 + ); 273 + 274 + try { 275 + await manager.init(); 276 + 277 + const storage = manager.getSyncStorage(); 278 + // Set up two DIDs with different endpoints 279 + storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds1.example.com" }); 280 + storage.updatePeerInfo("did:plc:remote1", "12D3KooWPeer1", ["/ip4/1.2.3.4/tcp/4001"]); 281 + 282 + storage.upsertState({ did: "did:plc:remote2", pdsEndpoint: "https://pds2.example.com" }); 283 + storage.updatePeerInfo("did:plc:remote2", "12D3KooWPeer2", ["/ip4/5.6.7.8/tcp/4001"]); 284 + 285 + const discoverSpy = vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({ 286 + pdsEndpoint: "https://pds1.example.com", 287 + peerId: "12D3KooWNewPeer1", 288 + multiaddrs: ["/ip4/10.0.0.1/tcp/4001"], 289 + }); 290 + 291 + // Refresh for pds1 endpoint only 292 + manager.refreshPeerInfoForEndpoint("https://pds1.example.com"); 293 + 294 + // Wait for async discovery to complete 295 + await new Promise((r) => setTimeout(r, 200)); 296 + 297 + // discoverPeer should only have been called for remote1 298 + expect(discoverSpy).toHaveBeenCalledWith("did:plc:remote1"); 299 + expect(discoverSpy).not.toHaveBeenCalledWith("did:plc:remote2"); 300 + 301 + // remote1 should have new peer info 302 + const state1 = storage.getState("did:plc:remote1"); 303 + expect(state1!.peerId).toBe("12D3KooWNewPeer1"); 304 + 305 + // remote2 should be untouched 306 + const state2 = storage.getState("did:plc:remote2"); 307 + expect(state2!.peerId).toBe("12D3KooWPeer2"); 308 + 309 + discoverSpy.mockRestore(); 310 + } finally { 311 + manager.stop(); 312 + db.close(); 313 + } 314 + }); 315 + }); 316 + 317 + // ============================================ 318 + // Republish on multiaddr change 319 + // ============================================ 320 + 321 + describe("republish on multiaddr change", () => { 322 + let tmpDir: string; 323 + 324 + beforeEach(() => { 325 + tmpDir = mkdtempSync(join(tmpdir(), "peer-republish-test-")); 326 + }); 327 + 328 + afterEach(() => { 329 + rmSync(tmpDir, { recursive: true, force: true }); 330 + }); 331 + 332 + it("republishes peer identity when multiaddrs change", async () => { 333 + const db = new Database(join(tmpDir, "test.db")); 334 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 335 + 336 + const { RepoManager } = await import("../repo-manager.js"); 337 + const { ReplicationManager } = await import("./replication-manager.js"); 338 + const { DidResolver } = await import("../did-resolver.js"); 339 + 340 + const repoManager = new RepoManager(db, config); 341 + repoManager.init(); 342 + 343 + const mockNet = createMockNetworkService(); 344 + const mockBlocks = createMockBlockStore(); 345 + const didResolver = new DidResolver(); 346 + 347 + const manager = new ReplicationManager( 348 + db, config, repoManager, mockBlocks, mockNet, didResolver, 349 + ); 350 + 351 + try { 352 + await manager.init(); 353 + 354 + // After init(), publishPeerIdentity was called, storing lastPublishedMultiaddrs 355 + const putRecordSpy = vi.spyOn(repoManager, "putRecord"); 356 + putRecordSpy.mockClear(); 357 + 358 + // Change multiaddrs 359 + mockNet._setMultiaddrs(["/ip4/127.0.0.1/tcp/4001", "/ip4/192.168.1.1/tcp/4002"]); 360 + 361 + // Mock syncDid to avoid actual repo fetch 362 + vi.spyOn(manager, "syncDid").mockResolvedValue(undefined); 363 + 364 + await manager.syncAll(); 365 + 366 + // putRecord should have been called for republish (org.p2pds.peer/self) 367 + const peerRecordCalls = putRecordSpy.mock.calls.filter( 368 + (call) => call[0] === "org.p2pds.peer" && call[1] === "self", 369 + ); 370 + expect(peerRecordCalls.length).toBe(1); 371 + 372 + vi.restoreAllMocks(); 373 + } finally { 374 + manager.stop(); 375 + db.close(); 376 + } 377 + }); 378 + 379 + it("skips republish when multiaddrs are unchanged", async () => { 380 + const db = new Database(join(tmpDir, "test-skip.db")); 381 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 382 + 383 + const { RepoManager } = await import("../repo-manager.js"); 384 + const { ReplicationManager } = await import("./replication-manager.js"); 385 + const { DidResolver } = await import("../did-resolver.js"); 386 + 387 + const repoManager = new RepoManager(db, config); 388 + repoManager.init(); 389 + 390 + const mockNet = createMockNetworkService(); 391 + const mockBlocks = createMockBlockStore(); 392 + const didResolver = new DidResolver(); 393 + 394 + const manager = new ReplicationManager( 395 + db, config, repoManager, mockBlocks, mockNet, didResolver, 396 + ); 397 + 398 + try { 399 + await manager.init(); 400 + 401 + const putRecordSpy = vi.spyOn(repoManager, "putRecord"); 402 + putRecordSpy.mockClear(); 403 + 404 + // Don't change multiaddrs — keep same as init() 405 + 406 + // Mock syncDid 407 + vi.spyOn(manager, "syncDid").mockResolvedValue(undefined); 408 + 409 + await manager.syncAll(); 410 + 411 + // No republish should occur 412 + const peerRecordCalls = putRecordSpy.mock.calls.filter( 413 + (call) => call[0] === "org.p2pds.peer" && call[1] === "self", 414 + ); 415 + expect(peerRecordCalls.length).toBe(0); 416 + 417 + vi.restoreAllMocks(); 418 + } finally { 419 + manager.stop(); 420 + db.close(); 421 + } 422 + }); 423 + }); 424 + 425 + // ============================================ 426 + // getRemoteAddrs 427 + // ============================================ 428 + 429 + describe("getRemoteAddrs", () => { 430 + it("returns empty array for unknown peer", () => { 431 + const mockNet = createMockNetworkService(); 432 + const result = mockNet.getRemoteAddrs("12D3KooWUnknown"); 433 + expect(result).toEqual([]); 434 + }); 435 + 436 + it("returns configured addrs for known peer", () => { 437 + const mockNet = createMockNetworkService(); 438 + mockNet._setRemoteAddrs("12D3KooWKnown", ["/ip4/1.2.3.4/tcp/5555"]); 439 + const result = mockNet.getRemoteAddrs("12D3KooWKnown"); 440 + expect(result).toEqual(["/ip4/1.2.3.4/tcp/5555"]); 441 + }); 442 + }); 443 + 444 + // ============================================ 445 + // Observed addrs merged into stored addrs 446 + // ============================================ 447 + 448 + describe("observed addr merging", () => { 449 + let tmpDir: string; 450 + 451 + beforeEach(() => { 452 + tmpDir = mkdtempSync(join(tmpdir(), "peer-merge-test-")); 453 + }); 454 + 455 + afterEach(() => { 456 + rmSync(tmpDir, { recursive: true, force: true }); 457 + }); 458 + 459 + it("merges observed addrs into stored addrs during syncDid", async () => { 460 + const db = new Database(join(tmpDir, "test.db")); 461 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 462 + 463 + const { RepoManager } = await import("../repo-manager.js"); 464 + const { ReplicationManager } = await import("./replication-manager.js"); 465 + const { DidResolver } = await import("../did-resolver.js"); 466 + const { PeerDiscovery } = await import("./peer-discovery.js"); 467 + const { RepoFetcher } = await import("./repo-fetcher.js"); 468 + 469 + const repoManager = new RepoManager(db, config); 470 + repoManager.init(); 471 + 472 + const mockNet = createMockNetworkService(); 473 + const mockBlocks = createMockBlockStore(); 474 + const didResolver = new DidResolver(); 475 + 476 + const manager = new ReplicationManager( 477 + db, config, repoManager, mockBlocks, mockNet, didResolver, 478 + ); 479 + 480 + try { 481 + await manager.init(); 482 + 483 + const storage = manager.getSyncStorage(); 484 + storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" }); 485 + // Clear peer_info_fetched_at so refresh triggers 486 + db.prepare("UPDATE replication_state SET peer_info_fetched_at = NULL WHERE did = ?") 487 + .run("did:plc:remote1"); 488 + 489 + // Discovery returns peerId + one addr 490 + vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({ 491 + pdsEndpoint: "https://pds.example.com", 492 + peerId: "12D3KooWPeerX", 493 + multiaddrs: ["/ip4/1.2.3.4/tcp/4001"], 494 + }); 495 + 496 + // Active connection has an additional observed addr 497 + mockNet._setRemoteAddrs("12D3KooWPeerX", ["/ip4/5.6.7.8/tcp/9999"]); 498 + 499 + // Mock fetchRepo to reject so we can inspect state after peer discovery 500 + vi.spyOn(RepoFetcher.prototype, "resolvePds").mockResolvedValue("https://pds.example.com"); 501 + vi.spyOn(RepoFetcher.prototype, "fetchRepo").mockRejectedValue(new Error("test skip")); 502 + 503 + // Spy on updatePeerInfo to capture the merged call 504 + const updateSpy = vi.spyOn(storage, "updatePeerInfo"); 505 + 506 + try { 507 + await manager.syncDid("did:plc:remote1"); 508 + } catch { 509 + // Expected 510 + } 511 + 512 + // updatePeerInfo should have been called with the merged addrs 513 + // (before clearPeerInfo in the error handler wipes them) 514 + const mergedCall = updateSpy.mock.calls.find( 515 + (call) => call[2] && (call[2] as string[]).length === 2, 516 + ); 517 + expect(mergedCall).toBeDefined(); 518 + expect(mergedCall![2]).toContain("/ip4/1.2.3.4/tcp/4001"); 519 + expect(mergedCall![2]).toContain("/ip4/5.6.7.8/tcp/9999"); 520 + 521 + vi.restoreAllMocks(); 522 + } finally { 523 + manager.stop(); 524 + db.close(); 525 + } 526 + }); 527 + }); 528 + 529 + // ============================================ 530 + // Identity notification integration 531 + // ============================================ 532 + 533 + describe("Identity notification integration", () => { 534 + let tmpDir: string; 535 + 536 + beforeEach(() => { 537 + tmpDir = mkdtempSync(join(tmpdir(), "peer-identity-test-")); 538 + }); 539 + 540 + afterEach(() => { 541 + rmSync(tmpDir, { recursive: true, force: true }); 542 + }); 543 + 544 + it("init() subscribes to identity topics for tracked DIDs", async () => { 545 + const db = new Database(join(tmpDir, "test.db")); 546 + const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); 547 + 548 + const { RepoManager } = await import("../repo-manager.js"); 549 + const { ReplicationManager } = await import("./replication-manager.js"); 550 + const { DidResolver } = await import("../did-resolver.js"); 551 + 552 + const repoManager = new RepoManager(db, config); 553 + repoManager.init(); 554 + 555 + const mockNet = createMockNetworkService(); 556 + const mockBlocks = createMockBlockStore(); 557 + const didResolver = new DidResolver(); 558 + 559 + const manager = new ReplicationManager( 560 + db, config, repoManager, mockBlocks, mockNet, didResolver, 561 + ); 562 + 563 + try { 564 + await manager.init(); 565 + 566 + expect(mockNet.subscribeIdentityTopics).toHaveBeenCalled(); 567 + expect(mockNet._subscribedIdentityDids).toContain("did:plc:remote1"); 568 + expect(mockNet._subscribedIdentityDids).toContain("did:plc:remote2"); 569 + 570 + expect(mockNet.onIdentityNotification).toHaveBeenCalled(); 571 + expect(mockNet._identityHandlers.length).toBeGreaterThan(0); 572 + } finally { 573 + manager.stop(); 574 + db.close(); 575 + } 576 + }); 577 + 578 + it("identity notification updates peer info immediately", async () => { 579 + const db = new Database(join(tmpDir, "test-update.db")); 580 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 581 + 582 + const { RepoManager } = await import("../repo-manager.js"); 583 + const { ReplicationManager } = await import("./replication-manager.js"); 584 + const { DidResolver } = await import("../did-resolver.js"); 585 + 586 + const repoManager = new RepoManager(db, config); 587 + repoManager.init(); 588 + 589 + const mockNet = createMockNetworkService(); 590 + const mockBlocks = createMockBlockStore(); 591 + const didResolver = new DidResolver(); 592 + 593 + const manager = new ReplicationManager( 594 + db, config, repoManager, mockBlocks, mockNet, didResolver, 595 + ); 596 + 597 + try { 598 + await manager.init(); 599 + 600 + const storage = manager.getSyncStorage(); 601 + storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" }); 602 + storage.updatePeerInfo("did:plc:remote1", "12D3KooWOldPeer", ["/ip4/1.2.3.4/tcp/4001"]); 603 + 604 + // Simulate identity notification 605 + mockNet._simulateIdentityNotification({ 606 + did: "did:plc:remote1", 607 + peerId: "12D3KooWNewPeer", 608 + multiaddrs: ["/ip4/10.0.0.1/tcp/5001", "/ip4/10.0.0.2/tcp/5002"], 609 + time: new Date().toISOString(), 610 + }); 611 + 612 + // Peer info should be updated immediately 613 + const state = storage.getState("did:plc:remote1"); 614 + expect(state!.peerId).toBe("12D3KooWNewPeer"); 615 + expect(state!.peerMultiaddrs).toEqual(["/ip4/10.0.0.1/tcp/5001", "/ip4/10.0.0.2/tcp/5002"]); 616 + } finally { 617 + manager.stop(); 618 + db.close(); 619 + } 620 + }); 621 + 622 + it("identity notification for untracked DID is ignored", async () => { 623 + const db = new Database(join(tmpDir, "test-ignore.db")); 624 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 625 + 626 + const { RepoManager } = await import("../repo-manager.js"); 627 + const { ReplicationManager } = await import("./replication-manager.js"); 628 + const { DidResolver } = await import("../did-resolver.js"); 629 + 630 + const repoManager = new RepoManager(db, config); 631 + repoManager.init(); 632 + 633 + const mockNet = createMockNetworkService(); 634 + const mockBlocks = createMockBlockStore(); 635 + const didResolver = new DidResolver(); 636 + 637 + const manager = new ReplicationManager( 638 + db, config, repoManager, mockBlocks, mockNet, didResolver, 639 + ); 640 + 641 + try { 642 + await manager.init(); 643 + 644 + const storage = manager.getSyncStorage(); 645 + 646 + // Simulate identity notification for untracked DID 647 + mockNet._simulateIdentityNotification({ 648 + did: "did:plc:unknown", 649 + peerId: "12D3KooWNewPeer", 650 + multiaddrs: ["/ip4/10.0.0.1/tcp/5001"], 651 + time: new Date().toISOString(), 652 + }); 653 + 654 + // No state should exist for untracked DID 655 + const state = storage.getState("did:plc:unknown"); 656 + expect(state).toBeNull(); 657 + } finally { 658 + manager.stop(); 659 + db.close(); 660 + } 661 + }); 662 + 663 + it("stop() unsubscribes from identity topics", async () => { 664 + const db = new Database(join(tmpDir, "test-unsub.db")); 665 + const config = testConfig(tmpDir, ["did:plc:remote1"]); 666 + 667 + const { RepoManager } = await import("../repo-manager.js"); 668 + const { ReplicationManager } = await import("./replication-manager.js"); 669 + const { DidResolver } = await import("../did-resolver.js"); 670 + 671 + const repoManager = new RepoManager(db, config); 672 + repoManager.init(); 673 + 674 + const mockNet = createMockNetworkService(); 675 + const mockBlocks = createMockBlockStore(); 676 + const didResolver = new DidResolver(); 677 + 678 + const manager = new ReplicationManager( 679 + db, config, repoManager, mockBlocks, mockNet, didResolver, 680 + ); 681 + 682 + try { 683 + await manager.init(); 684 + manager.stop(); 685 + 686 + expect(mockNet.unsubscribeIdentityTopics).toHaveBeenCalled(); 687 + } finally { 688 + db.close(); 689 + } 690 + }); 691 + 692 + it("publishPeerIdentity broadcasts identity notification via gossipsub", async () => { 693 + const db = new Database(join(tmpDir, "test-publish.db")); 694 + const config = testConfig(tmpDir, []); 695 + 696 + const { RepoManager } = await import("../repo-manager.js"); 697 + const { ReplicationManager } = await import("./replication-manager.js"); 698 + const { DidResolver } = await import("../did-resolver.js"); 699 + 700 + const repoManager = new RepoManager(db, config); 701 + repoManager.init(); 702 + 703 + const mockNet = createMockNetworkService(); 704 + const mockBlocks = createMockBlockStore(); 705 + const didResolver = new DidResolver(); 706 + 707 + const manager = new ReplicationManager( 708 + db, config, repoManager, mockBlocks, mockNet, didResolver, 709 + ); 710 + 711 + try { 712 + await manager.init(); 713 + 714 + // publishPeerIdentity was called during init 715 + expect(mockNet.publishIdentityNotification).toHaveBeenCalledWith( 716 + "did:plc:local", 717 + "12D3KooWMockPeer", 718 + ["/ip4/127.0.0.1/tcp/4001"], 719 + ); 720 + } finally { 721 + manager.stop(); 722 + db.close(); 723 + } 724 + }); 725 + });
+97 -4
src/replication/replication-manager.ts
··· 9 9 import type { RepoManager } from "../repo-manager.js"; 10 10 import { extractBlobCids } from "../repo-manager.js"; 11 11 import { create as createCid, CODEC_RAW, toString as cidToString } from "@atcute/cid"; 12 - import type { BlockStore, NetworkService, CommitNotification } from "../ipfs.js"; 12 + import type { BlockStore, NetworkService, CommitNotification, IdentityNotification } from "../ipfs.js"; 13 13 import type { DidResolver } from "../did-resolver.js"; 14 14 import { readCarWithRoot } from "@atproto/repo"; 15 15 import { decode as cborDecode } from "../cbor-compat.js"; ··· 70 70 private offerManager: OfferManager | null = null; 71 71 /** Per-DID last-sync timestamps (epoch ms) for policy-driven interval tracking. */ 72 72 private lastSyncTimestamps: Map<string, number> = new Map(); 73 + /** Multiaddrs at last publishPeerIdentity() call, for change detection. */ 74 + private lastPublishedMultiaddrs: string[] = []; 73 75 /** Dedup set for gossipsub notifications, keyed by `${did}:${rev}`. */ 74 76 private recentNotifications: Set<string> = new Set(); 75 77 private notificationCleanupTimer: ReturnType<typeof setInterval> | null = null; ··· 144 146 const peerId = this.networkService.getPeerId(); 145 147 if (!peerId) return; // networking disabled 146 148 149 + const multiaddrs = this.networkService.getMultiaddrs(); 147 150 const record: PeerIdentityRecord = { 148 151 $type: PEER_NSID, 149 152 peerId, 150 - multiaddrs: this.networkService.getMultiaddrs(), 153 + multiaddrs, 151 154 createdAt: new Date().toISOString(), 152 155 }; 153 156 154 157 await this.repoManager.putRecord(PEER_NSID, "self", record); 158 + this.lastPublishedMultiaddrs = multiaddrs; 159 + 160 + // Broadcast identity change via gossipsub (fire-and-forget) 161 + this.networkService.publishIdentityNotification(this.config.DID, peerId, multiaddrs).catch(() => {}); 162 + } 163 + 164 + /** 165 + * Republish peer identity if our multiaddrs have changed since the last publish. 166 + * Called at the start of syncAll() to piggyback on the existing sync loop. 167 + */ 168 + private async republishIfMultiaddrsChanged(): Promise<void> { 169 + const current = this.networkService.getMultiaddrs(); 170 + const changed = 171 + current.length !== this.lastPublishedMultiaddrs.length || 172 + current.some((addr, i) => addr !== this.lastPublishedMultiaddrs[i]); 173 + if (changed) { 174 + await this.publishPeerIdentity(); 175 + } 155 176 } 156 177 157 178 /** ··· 331 352 * - Respects per-DID sync intervals (skips DIDs not yet due) 332 353 */ 333 354 async syncAll(): Promise<void> { 355 + // Republish our identity if multiaddrs changed (e.g. after network reconnect) 356 + await this.republishIfMultiaddrsChanged(); 357 + 334 358 const dids = this.getReplicateDids(); 335 359 336 360 // Sort by priority (highest first) when policy engine is present ··· 434 458 try { 435 459 const peerInfo = await this.peerDiscovery.discoverPeer(did); 436 460 if (peerInfo) { 461 + // Detect PeerID changes (peer restarted with new identity) 462 + if (state.peerId && peerInfo.peerId && state.peerId !== peerInfo.peerId) { 463 + console.warn( 464 + `[replication] PeerID changed for ${did}: ${state.peerId} → ${peerInfo.peerId}`, 465 + ); 466 + } 437 467 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); 468 + // Merge observed multiaddrs from active libp2p connections 469 + if (peerInfo.peerId) { 470 + const observed = this.networkService.getRemoteAddrs(peerInfo.peerId); 471 + if (observed.length > 0) { 472 + const merged = [...new Set([...peerInfo.multiaddrs, ...observed])]; 473 + if (merged.length > peerInfo.multiaddrs.length) { 474 + this.syncStorage.updatePeerInfo(did, peerInfo.peerId, merged); 475 + } 476 + } 477 + } 438 478 } 439 479 } catch { 440 480 // Non-fatal: peer discovery is optional ··· 451 491 since, 452 492 ); 453 493 } catch (sourceErr) { 454 - // On failure, clear cached peer info 494 + // On failure, clear cached peer info and trigger re-discovery 455 495 this.syncStorage.clearPeerInfo(did); 496 + this.peerDiscovery.discoverPeer(did).then((peerInfo) => { 497 + if (peerInfo) { 498 + this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); 499 + } 500 + }).catch(() => {}); 456 501 const err = sourceErr instanceof Error ? sourceErr : new Error(String(sourceErr)); 457 502 if (since) { 458 503 // Retry full sync from source, then fall back to peers ··· 912 957 }); 913 958 }); 914 959 960 + this.networkService.onIdentityNotification((notification) => { 961 + this.handleIdentityNotification(notification); 962 + }); 963 + 915 964 this.networkService.subscribeCommitTopics(dids); 965 + this.networkService.subscribeIdentityTopics(dids); 916 966 917 967 // Periodically clear the dedup set to prevent unbounded growth 918 968 this.notificationCleanupTimer = setInterval(() => { ··· 951 1001 } 952 1002 953 1003 /** 1004 + * Handle an incoming gossipsub identity notification. 1005 + * Updates peer info immediately if the DID is tracked. 1006 + */ 1007 + private handleIdentityNotification(notification: IdentityNotification): void { 1008 + const did = notification.did; 1009 + 1010 + // Only process DIDs we are tracking 1011 + const trackedDids = this.getReplicateDids(); 1012 + if (!trackedDids.includes(did)) return; 1013 + 1014 + // Log PeerID changes 1015 + const state = this.syncStorage.getState(did); 1016 + if (state?.peerId && state.peerId !== notification.peerId) { 1017 + console.warn( 1018 + `[replication] Identity notification: PeerID changed for ${did}: ${state.peerId} → ${notification.peerId}`, 1019 + ); 1020 + } 1021 + 1022 + // Update peer info immediately 1023 + this.syncStorage.updatePeerInfo(did, notification.peerId, notification.multiaddrs); 1024 + } 1025 + 1026 + /** 954 1027 * Stop periodic sync, verification, and firehose subscription. 955 1028 */ 956 1029 stop(): void { ··· 978 1051 this.challengeScheduler.stop(); 979 1052 this.challengeScheduler = null; 980 1053 } 981 - // Stop gossipsub notification cleanup 1054 + // Stop gossipsub notification cleanup and unsubscribe identity topics 982 1055 if (this.notificationCleanupTimer) { 983 1056 clearInterval(this.notificationCleanupTimer); 984 1057 this.notificationCleanupTimer = null; 985 1058 } 1059 + this.networkService.unsubscribeIdentityTopics(this.getReplicateDids()); 986 1060 } 987 1061 988 1062 /** ··· 1238 1312 return this.challengeStorage.getReliability(peerDid); 1239 1313 } 1240 1314 return this.challengeStorage.getAllReliability(); 1315 + } 1316 + 1317 + /** 1318 + * Refresh peer info for all DIDs associated with a PDS endpoint. 1319 + * Called when a connection to that endpoint fails (e.g. challenge transport fallback). 1320 + * Fire-and-forget: clears stale cache and triggers re-discovery. 1321 + */ 1322 + refreshPeerInfoForEndpoint(pdsEndpoint: string): void { 1323 + const states = this.syncStorage.getAllStates(); 1324 + for (const state of states) { 1325 + if (state.pdsEndpoint === pdsEndpoint) { 1326 + this.syncStorage.clearPeerInfo(state.did); 1327 + this.peerDiscovery.discoverPeer(state.did).then((peerInfo) => { 1328 + if (peerInfo) { 1329 + this.syncStorage.updatePeerInfo(state.did, peerInfo.peerId, peerInfo.multiaddrs); 1330 + } 1331 + }).catch(() => {}); 1332 + } 1333 + } 1241 1334 } 1242 1335 1243 1336 /**
+1
src/server.ts
··· 195 195 resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 196 196 onFallback: (endpoint, error) => { 197 197 console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 198 + replicationManager!.refreshPeerInfoForEndpoint(endpoint); 198 199 }, 199 200 }, 200 201 );