atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 599 lines 20 kB view raw
1/** 2 * Peer multiaddr freshness + DID-to-PeerID staleness detection tests. 3 * 4 * Tests: PeerID change detection, refreshPeerInfoForEndpoint, 5 * republish on multiaddr change, getRemoteAddrs, identity notifications. 6 */ 7 8import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 9import { mkdtempSync, rmSync } from "node:fs"; 10import { tmpdir } from "node:os"; 11import { join } from "node:path"; 12import Database from "better-sqlite3"; 13import { 14 encode as cborEncode, 15 decode as cborDecode, 16} from "../cbor-compat.js"; 17import { 18 identityTopic, 19 IDENTITY_TOPIC_PREFIX, 20 type IdentityNotification, 21 type CommitNotification, 22} from "../ipfs.js"; 23import type { Config } from "../config.js"; 24 25// ============================================ 26// Helpers 27// ============================================ 28 29function testConfig(dataDir: string, replicateDids: string[] = []): Config { 30 return { 31 DID: "did:plc:local", 32 HANDLE: "test.example.com", 33 PDS_HOSTNAME: "test.example.com", 34 AUTH_TOKEN: "test", 35 SIGNING_KEY: 36 "0000000000000000000000000000000000000000000000000000000000000001", 37 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 38 JWT_SECRET: "test", 39 PASSWORD_HASH: "$2a$10$test", 40 DATA_DIR: dataDir, 41 PORT: 3000, 42 IPFS_ENABLED: true, 43 IPFS_NETWORKING: false, 44 REPLICATE_DIDS: replicateDids, 45 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 46 FIREHOSE_ENABLED: false, 47 RATE_LIMIT_ENABLED: false, 48 RATE_LIMIT_READ_PER_MIN: 300, 49 RATE_LIMIT_SYNC_PER_MIN: 30, 50 RATE_LIMIT_SESSION_PER_MIN: 10, 51 RATE_LIMIT_WRITE_PER_MIN: 200, 52 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 53 RATE_LIMIT_MAX_CONNECTIONS: 100, 54 RATE_LIMIT_FIREHOSE_PER_IP: 3, 55 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 56 }; 57} 58 59type IdentityHandler = (n: IdentityNotification) => void | Promise<void>; 60 61function createMockNetworkService() { 62 const commitHandlers: Array<(n: CommitNotification) => void | Promise<void>> = []; 63 const identityHandlers: IdentityHandler[] = []; 64 const subscribedCommitDids: string[] = []; 65 const subscribedIdentityDids: string[] = []; 66 const unsubscribedIdentityDids: string[] = []; 67 const publishedIdentity: IdentityNotification[] = []; 68 let multiaddrs = ["/ip4/127.0.0.1/tcp/4001"]; 69 let remoteAddrsMap: Record<string, string[]> = {}; 70 71 return { 72 provideBlocks: vi.fn().mockResolvedValue(undefined), 73 publishCommitNotification: vi.fn().mockResolvedValue(undefined), 74 onCommitNotification: vi.fn().mockImplementation( 75 (handler: (n: CommitNotification) => void | Promise<void>) => { 76 commitHandlers.push(handler); 77 }, 78 ), 79 subscribeCommitTopics: vi.fn().mockImplementation( 80 (dids: string[]) => { subscribedCommitDids.push(...dids); }, 81 ), 82 unsubscribeCommitTopics: vi.fn(), 83 publishIdentityNotification: vi.fn().mockImplementation( 84 async (did: string, peerId: string, addrs: string[]) => { 85 publishedIdentity.push({ did, peerId, multiaddrs: addrs, time: new Date().toISOString() }); 86 }, 87 ), 88 onIdentityNotification: vi.fn().mockImplementation( 89 (handler: IdentityHandler) => { identityHandlers.push(handler); }, 90 ), 91 subscribeIdentityTopics: vi.fn().mockImplementation( 92 (dids: string[]) => { subscribedIdentityDids.push(...dids); }, 93 ), 94 unsubscribeIdentityTopics: vi.fn().mockImplementation( 95 (dids: string[]) => { unsubscribedIdentityDids.push(...dids); }, 96 ), 97 getPeerId: vi.fn().mockReturnValue("12D3KooWMockPeer"), 98 getMultiaddrs: vi.fn().mockImplementation(() => multiaddrs), 99 getConnectionCount: vi.fn().mockReturnValue(0), 100 getRemoteAddrs: vi.fn().mockImplementation((peerId: string) => remoteAddrsMap[peerId] ?? []), 101 provideForDid: vi.fn().mockResolvedValue(undefined), 102 findProvidersForDid: vi.fn().mockResolvedValue([]), 103 // Test helpers 104 _commitHandlers: commitHandlers, 105 _identityHandlers: identityHandlers, 106 _subscribedCommitDids: subscribedCommitDids, 107 _subscribedIdentityDids: subscribedIdentityDids, 108 _unsubscribedIdentityDids: unsubscribedIdentityDids, 109 _publishedIdentity: publishedIdentity, 110 _setMultiaddrs(addrs: string[]) { multiaddrs = addrs; }, 111 _setRemoteAddrs(peerId: string, addrs: string[]) { remoteAddrsMap[peerId] = addrs; }, 112 _simulateIdentityNotification(notification: IdentityNotification) { 113 for (const handler of identityHandlers) { 114 handler(notification); 115 } 116 }, 117 }; 118} 119 120function createMockBlockStore() { 121 return { 122 putBlock: vi.fn().mockResolvedValue(undefined), 123 getBlock: vi.fn().mockResolvedValue(null), 124 hasBlock: vi.fn().mockResolvedValue(false), 125 putBlocks: vi.fn().mockResolvedValue(undefined), 126 deleteBlock: vi.fn().mockResolvedValue(undefined), 127 }; 128} 129 130// ============================================ 131// IdentityNotification encoding + topic tests 132// ============================================ 133 134describe("IdentityNotification encoding", () => { 135 it("CBOR encode/decode round-trip", () => { 136 const notification: IdentityNotification = { 137 did: "did:plc:abc123", 138 peerId: "12D3KooWTest", 139 multiaddrs: ["/ip4/127.0.0.1/tcp/4001", "/ip4/192.168.1.1/tcp/4001"], 140 time: "2024-01-01T00:00:00.000Z", 141 }; 142 143 const encoded = cborEncode(notification); 144 expect(encoded).toBeInstanceOf(Uint8Array); 145 expect(encoded.length).toBeGreaterThan(0); 146 147 const decoded = cborDecode(encoded) as IdentityNotification; 148 expect(decoded.did).toBe(notification.did); 149 expect(decoded.peerId).toBe(notification.peerId); 150 expect(decoded.multiaddrs).toEqual(notification.multiaddrs); 151 expect(decoded.time).toBe(notification.time); 152 }); 153 154 it("identityTopic() produces correct topic strings", () => { 155 const did = "did:plc:abc123"; 156 const topic = identityTopic(did); 157 expect(topic).toBe("/p2pds/identity/1/did:plc:abc123"); 158 expect(topic.startsWith(IDENTITY_TOPIC_PREFIX)).toBe(true); 159 }); 160 161 it("different DIDs produce different identity topics", () => { 162 const topic1 = identityTopic("did:plc:aaa"); 163 const topic2 = identityTopic("did:plc:bbb"); 164 expect(topic1).not.toBe(topic2); 165 }); 166}); 167 168// ============================================ 169// PeerID change detection 170// ============================================ 171 172describe("PeerID change detection", () => { 173 let tmpDir: string; 174 175 beforeEach(() => { 176 tmpDir = mkdtempSync(join(tmpdir(), "peer-freshness-test-")); 177 }); 178 179 afterEach(() => { 180 rmSync(tmpDir, { recursive: true, force: true }); 181 }); 182 183 it("logs warning when PeerID changes during syncDid()", async () => { 184 const db = new Database(join(tmpDir, "test.db")); 185 const config = testConfig(tmpDir, ["did:plc:remote1"]); 186 187 const { RepoManager } = await import("../repo-manager.js"); 188 const { ReplicationManager } = await import("./replication-manager.js"); 189 const { DidResolver } = await import("../did-resolver.js"); 190 191 const repoManager = new RepoManager(db, config); 192 repoManager.init(); 193 194 const mockNet = createMockNetworkService(); 195 const mockBlocks = createMockBlockStore(); 196 const didResolver = new DidResolver(); 197 198 const manager = new ReplicationManager( 199 db, config, repoManager, mockBlocks, mockNet, didResolver, 200 ); 201 202 try { 203 await manager.init(); 204 205 // Seed sync state with an existing peerId 206 const storage = manager.getSyncStorage(); 207 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" }); 208 storage.updatePeerInfo("did:plc:remote1", "12D3KooWOldPeer", ["/ip4/1.2.3.4/tcp/4001"]); 209 // Set peerInfoFetchedAt to null so shouldRefreshPeerInfo returns true 210 db.prepare("UPDATE replication_state SET peer_info_fetched_at = NULL WHERE did = ?") 211 .run("did:plc:remote1"); 212 213 // Mock peer discovery to return a new PeerID 214 const { PeerDiscovery } = await import("./peer-discovery.js"); 215 vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({ 216 pdsEndpoint: "https://pds.example.com", 217 peerId: "12D3KooWNewPeer", 218 multiaddrs: ["/ip4/5.6.7.8/tcp/4001"], 219 endpoint: null, 220 }); 221 222 // Mock repo fetcher 223 const { RepoFetcher } = await import("./repo-fetcher.js"); 224 vi.spyOn(RepoFetcher.prototype, "resolvePds").mockResolvedValue("https://pds.example.com"); 225 vi.spyOn(RepoFetcher.prototype, "fetchRepo").mockRejectedValue(new Error("test skip")); 226 227 const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); 228 229 try { 230 await manager.syncDid("did:plc:remote1"); 231 } catch { 232 // Expected to fail since fetchRepo is mocked to reject 233 } 234 235 // Check that PeerID change warning was logged 236 const peerIdChangeWarning = warnSpy.mock.calls.find( 237 (call) => typeof call[0] === "string" && call[0].includes("PeerID changed"), 238 ); 239 expect(peerIdChangeWarning).toBeDefined(); 240 expect(peerIdChangeWarning![0]).toContain("12D3KooWOldPeer"); 241 expect(peerIdChangeWarning![0]).toContain("12D3KooWNewPeer"); 242 243 warnSpy.mockRestore(); 244 vi.restoreAllMocks(); 245 } finally { 246 manager.stop(); 247 db.close(); 248 } 249 }); 250}); 251 252// ============================================ 253// refreshPeerInfoForEndpoint 254// ============================================ 255 256describe("refreshPeerInfoForEndpoint", () => { 257 let tmpDir: string; 258 259 beforeEach(() => { 260 tmpDir = mkdtempSync(join(tmpdir(), "peer-refresh-test-")); 261 }); 262 263 afterEach(() => { 264 rmSync(tmpDir, { recursive: true, force: true }); 265 }); 266 267 it("clears stale peer info and triggers re-discovery for matching DIDs", async () => { 268 const db = new Database(join(tmpDir, "test.db")); 269 const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); 270 271 const { RepoManager } = await import("../repo-manager.js"); 272 const { ReplicationManager } = await import("./replication-manager.js"); 273 const { DidResolver } = await import("../did-resolver.js"); 274 const { PeerDiscovery } = await import("./peer-discovery.js"); 275 276 const repoManager = new RepoManager(db, config); 277 repoManager.init(); 278 279 const mockNet = createMockNetworkService(); 280 const mockBlocks = createMockBlockStore(); 281 const didResolver = new DidResolver(); 282 283 const manager = new ReplicationManager( 284 db, config, repoManager, mockBlocks, mockNet, didResolver, 285 ); 286 287 try { 288 await manager.init(); 289 290 const storage = manager.getSyncStorage(); 291 // Set up two DIDs with different endpoints 292 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds1.example.com" }); 293 storage.updatePeerInfo("did:plc:remote1", "12D3KooWPeer1", ["/ip4/1.2.3.4/tcp/4001"]); 294 295 storage.upsertState({ did: "did:plc:remote2", pdsEndpoint: "https://pds2.example.com" }); 296 storage.updatePeerInfo("did:plc:remote2", "12D3KooWPeer2", ["/ip4/5.6.7.8/tcp/4001"]); 297 298 const discoverSpy = vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({ 299 pdsEndpoint: "https://pds1.example.com", 300 peerId: "12D3KooWNewPeer1", 301 multiaddrs: ["/ip4/10.0.0.1/tcp/4001"], 302 endpoint: null, 303 }); 304 305 // Refresh for pds1 endpoint only 306 manager.refreshPeerInfoForEndpoint("https://pds1.example.com"); 307 308 // Wait for async discovery to complete 309 await new Promise((r) => setTimeout(r, 200)); 310 311 // discoverPeer should only have been called for remote1 312 expect(discoverSpy).toHaveBeenCalledWith("did:plc:remote1"); 313 expect(discoverSpy).not.toHaveBeenCalledWith("did:plc:remote2"); 314 315 // remote1 should have new peer info 316 const state1 = storage.getState("did:plc:remote1"); 317 expect(state1!.peerId).toBe("12D3KooWNewPeer1"); 318 319 // remote2 should be untouched 320 const state2 = storage.getState("did:plc:remote2"); 321 expect(state2!.peerId).toBe("12D3KooWPeer2"); 322 323 discoverSpy.mockRestore(); 324 } finally { 325 manager.stop(); 326 db.close(); 327 } 328 }); 329}); 330 331// ============================================ 332// getRemoteAddrs 333// ============================================ 334 335describe("getRemoteAddrs", () => { 336 it("returns empty array for unknown peer", () => { 337 const mockNet = createMockNetworkService(); 338 const result = mockNet.getRemoteAddrs("12D3KooWUnknown"); 339 expect(result).toEqual([]); 340 }); 341 342 it("returns configured addrs for known peer", () => { 343 const mockNet = createMockNetworkService(); 344 mockNet._setRemoteAddrs("12D3KooWKnown", ["/ip4/1.2.3.4/tcp/5555"]); 345 const result = mockNet.getRemoteAddrs("12D3KooWKnown"); 346 expect(result).toEqual(["/ip4/1.2.3.4/tcp/5555"]); 347 }); 348}); 349 350// ============================================ 351// Observed addrs merged into stored addrs 352// ============================================ 353 354describe("observed addr merging", () => { 355 let tmpDir: string; 356 357 beforeEach(() => { 358 tmpDir = mkdtempSync(join(tmpdir(), "peer-merge-test-")); 359 }); 360 361 afterEach(() => { 362 rmSync(tmpDir, { recursive: true, force: true }); 363 }); 364 365 it("merges observed addrs into stored addrs during syncDid", async () => { 366 const db = new Database(join(tmpDir, "test.db")); 367 const config = testConfig(tmpDir, ["did:plc:remote1"]); 368 369 const { RepoManager } = await import("../repo-manager.js"); 370 const { ReplicationManager } = await import("./replication-manager.js"); 371 const { DidResolver } = await import("../did-resolver.js"); 372 const { PeerDiscovery } = await import("./peer-discovery.js"); 373 const { RepoFetcher } = await import("./repo-fetcher.js"); 374 375 const repoManager = new RepoManager(db, config); 376 repoManager.init(); 377 378 const mockNet = createMockNetworkService(); 379 const mockBlocks = createMockBlockStore(); 380 const didResolver = new DidResolver(); 381 382 const manager = new ReplicationManager( 383 db, config, repoManager, mockBlocks, mockNet, didResolver, 384 ); 385 386 try { 387 await manager.init(); 388 389 const storage = manager.getSyncStorage(); 390 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" }); 391 // Clear peer_info_fetched_at so refresh triggers 392 db.prepare("UPDATE replication_state SET peer_info_fetched_at = NULL WHERE did = ?") 393 .run("did:plc:remote1"); 394 395 // Discovery returns peerId + one addr 396 vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({ 397 pdsEndpoint: "https://pds.example.com", 398 peerId: "12D3KooWPeerX", 399 multiaddrs: ["/ip4/1.2.3.4/tcp/4001"], 400 endpoint: null, 401 }); 402 403 // Active connection has an additional observed addr 404 mockNet._setRemoteAddrs("12D3KooWPeerX", ["/ip4/5.6.7.8/tcp/9999"]); 405 406 // Mock fetchRepo to reject so we can inspect state after peer discovery 407 vi.spyOn(RepoFetcher.prototype, "resolvePds").mockResolvedValue("https://pds.example.com"); 408 vi.spyOn(RepoFetcher.prototype, "fetchRepo").mockRejectedValue(new Error("test skip")); 409 410 // Spy on updatePeerInfo to capture the merged call 411 const updateSpy = vi.spyOn(storage, "updatePeerInfo"); 412 413 try { 414 await manager.syncDid("did:plc:remote1"); 415 } catch { 416 // Expected 417 } 418 419 // updatePeerInfo should have been called with the merged addrs 420 // (before clearPeerInfo in the error handler wipes them) 421 const mergedCall = updateSpy.mock.calls.find( 422 (call) => call[2] && (call[2] as string[]).length === 2, 423 ); 424 expect(mergedCall).toBeDefined(); 425 expect(mergedCall![2]).toContain("/ip4/1.2.3.4/tcp/4001"); 426 expect(mergedCall![2]).toContain("/ip4/5.6.7.8/tcp/9999"); 427 428 vi.restoreAllMocks(); 429 } finally { 430 manager.stop(); 431 db.close(); 432 } 433 }); 434}); 435 436// ============================================ 437// Identity notification integration 438// ============================================ 439 440describe("Identity notification integration", () => { 441 let tmpDir: string; 442 443 beforeEach(() => { 444 tmpDir = mkdtempSync(join(tmpdir(), "peer-identity-test-")); 445 }); 446 447 afterEach(() => { 448 rmSync(tmpDir, { recursive: true, force: true }); 449 }); 450 451 it("init() subscribes to identity topics for tracked DIDs", async () => { 452 const db = new Database(join(tmpDir, "test.db")); 453 const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); 454 455 const { RepoManager } = await import("../repo-manager.js"); 456 const { ReplicationManager } = await import("./replication-manager.js"); 457 const { DidResolver } = await import("../did-resolver.js"); 458 459 const repoManager = new RepoManager(db, config); 460 repoManager.init(); 461 462 const mockNet = createMockNetworkService(); 463 const mockBlocks = createMockBlockStore(); 464 const didResolver = new DidResolver(); 465 466 const manager = new ReplicationManager( 467 db, config, repoManager, mockBlocks, mockNet, didResolver, 468 ); 469 470 try { 471 await manager.init(); 472 473 expect(mockNet.subscribeIdentityTopics).toHaveBeenCalled(); 474 expect(mockNet._subscribedIdentityDids).toContain("did:plc:remote1"); 475 expect(mockNet._subscribedIdentityDids).toContain("did:plc:remote2"); 476 477 expect(mockNet.onIdentityNotification).toHaveBeenCalled(); 478 expect(mockNet._identityHandlers.length).toBeGreaterThan(0); 479 } finally { 480 manager.stop(); 481 db.close(); 482 } 483 }); 484 485 it("identity notification updates peer info immediately", async () => { 486 const db = new Database(join(tmpDir, "test-update.db")); 487 const config = testConfig(tmpDir, ["did:plc:remote1"]); 488 489 const { RepoManager } = await import("../repo-manager.js"); 490 const { ReplicationManager } = await import("./replication-manager.js"); 491 const { DidResolver } = await import("../did-resolver.js"); 492 493 const repoManager = new RepoManager(db, config); 494 repoManager.init(); 495 496 const mockNet = createMockNetworkService(); 497 const mockBlocks = createMockBlockStore(); 498 const didResolver = new DidResolver(); 499 500 const manager = new ReplicationManager( 501 db, config, repoManager, mockBlocks, mockNet, didResolver, 502 ); 503 504 try { 505 await manager.init(); 506 507 const storage = manager.getSyncStorage(); 508 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" }); 509 storage.updatePeerInfo("did:plc:remote1", "12D3KooWOldPeer", ["/ip4/1.2.3.4/tcp/4001"]); 510 511 // Simulate identity notification 512 mockNet._simulateIdentityNotification({ 513 did: "did:plc:remote1", 514 peerId: "12D3KooWNewPeer", 515 multiaddrs: ["/ip4/10.0.0.1/tcp/5001", "/ip4/10.0.0.2/tcp/5002"], 516 time: new Date().toISOString(), 517 }); 518 519 // Peer info should be updated immediately 520 const state = storage.getState("did:plc:remote1"); 521 expect(state!.peerId).toBe("12D3KooWNewPeer"); 522 expect(state!.peerMultiaddrs).toEqual(["/ip4/10.0.0.1/tcp/5001", "/ip4/10.0.0.2/tcp/5002"]); 523 } finally { 524 manager.stop(); 525 db.close(); 526 } 527 }); 528 529 it("identity notification for untracked DID is ignored", async () => { 530 const db = new Database(join(tmpDir, "test-ignore.db")); 531 const config = testConfig(tmpDir, ["did:plc:remote1"]); 532 533 const { RepoManager } = await import("../repo-manager.js"); 534 const { ReplicationManager } = await import("./replication-manager.js"); 535 const { DidResolver } = await import("../did-resolver.js"); 536 537 const repoManager = new RepoManager(db, config); 538 repoManager.init(); 539 540 const mockNet = createMockNetworkService(); 541 const mockBlocks = createMockBlockStore(); 542 const didResolver = new DidResolver(); 543 544 const manager = new ReplicationManager( 545 db, config, repoManager, mockBlocks, mockNet, didResolver, 546 ); 547 548 try { 549 await manager.init(); 550 551 const storage = manager.getSyncStorage(); 552 553 // Simulate identity notification for untracked DID 554 mockNet._simulateIdentityNotification({ 555 did: "did:plc:unknown", 556 peerId: "12D3KooWNewPeer", 557 multiaddrs: ["/ip4/10.0.0.1/tcp/5001"], 558 time: new Date().toISOString(), 559 }); 560 561 // No state should exist for untracked DID 562 const state = storage.getState("did:plc:unknown"); 563 expect(state).toBeNull(); 564 } finally { 565 manager.stop(); 566 db.close(); 567 } 568 }); 569 570 it("stop() unsubscribes from identity topics", async () => { 571 const db = new Database(join(tmpDir, "test-unsub.db")); 572 const config = testConfig(tmpDir, ["did:plc:remote1"]); 573 574 const { RepoManager } = await import("../repo-manager.js"); 575 const { ReplicationManager } = await import("./replication-manager.js"); 576 const { DidResolver } = await import("../did-resolver.js"); 577 578 const repoManager = new RepoManager(db, config); 579 repoManager.init(); 580 581 const mockNet = createMockNetworkService(); 582 const mockBlocks = createMockBlockStore(); 583 const didResolver = new DidResolver(); 584 585 const manager = new ReplicationManager( 586 db, config, repoManager, mockBlocks, mockNet, didResolver, 587 ); 588 589 try { 590 await manager.init(); 591 manager.stop(); 592 593 expect(mockNet.unsubscribeIdentityTopics).toHaveBeenCalled(); 594 } finally { 595 db.close(); 596 } 597 }); 598 599});