atproto user agency toolkit for individuals and groups
1/**
2 * Peer multiaddr freshness + DID-to-PeerID staleness detection tests.
3 *
4 * Tests: PeerID change detection, refreshPeerInfoForEndpoint,
5 * republish on multiaddr change, getRemoteAddrs, identity notifications.
6 */
7
8import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
9import { mkdtempSync, rmSync } from "node:fs";
10import { tmpdir } from "node:os";
11import { join } from "node:path";
12import Database from "better-sqlite3";
13import {
14 encode as cborEncode,
15 decode as cborDecode,
16} from "../cbor-compat.js";
17import {
18 identityTopic,
19 IDENTITY_TOPIC_PREFIX,
20 type IdentityNotification,
21 type CommitNotification,
22} from "../ipfs.js";
23import type { Config } from "../config.js";
24
25// ============================================
26// Helpers
27// ============================================
28
29function testConfig(dataDir: string, replicateDids: string[] = []): Config {
30 return {
31 DID: "did:plc:local",
32 HANDLE: "test.example.com",
33 PDS_HOSTNAME: "test.example.com",
34 AUTH_TOKEN: "test",
35 SIGNING_KEY:
36 "0000000000000000000000000000000000000000000000000000000000000001",
37 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
38 JWT_SECRET: "test",
39 PASSWORD_HASH: "$2a$10$test",
40 DATA_DIR: dataDir,
41 PORT: 3000,
42 IPFS_ENABLED: true,
43 IPFS_NETWORKING: false,
44 REPLICATE_DIDS: replicateDids,
45 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
46 FIREHOSE_ENABLED: false,
47 RATE_LIMIT_ENABLED: false,
48 RATE_LIMIT_READ_PER_MIN: 300,
49 RATE_LIMIT_SYNC_PER_MIN: 30,
50 RATE_LIMIT_SESSION_PER_MIN: 10,
51 RATE_LIMIT_WRITE_PER_MIN: 200,
52 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
53 RATE_LIMIT_MAX_CONNECTIONS: 100,
54 RATE_LIMIT_FIREHOSE_PER_IP: 3,
55 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
56 };
57}
58
59type IdentityHandler = (n: IdentityNotification) => void | Promise<void>;
60
61function createMockNetworkService() {
62 const commitHandlers: Array<(n: CommitNotification) => void | Promise<void>> = [];
63 const identityHandlers: IdentityHandler[] = [];
64 const subscribedCommitDids: string[] = [];
65 const subscribedIdentityDids: string[] = [];
66 const unsubscribedIdentityDids: string[] = [];
67 const publishedIdentity: IdentityNotification[] = [];
68 let multiaddrs = ["/ip4/127.0.0.1/tcp/4001"];
69 let remoteAddrsMap: Record<string, string[]> = {};
70
71 return {
72 provideBlocks: vi.fn().mockResolvedValue(undefined),
73 publishCommitNotification: vi.fn().mockResolvedValue(undefined),
74 onCommitNotification: vi.fn().mockImplementation(
75 (handler: (n: CommitNotification) => void | Promise<void>) => {
76 commitHandlers.push(handler);
77 },
78 ),
79 subscribeCommitTopics: vi.fn().mockImplementation(
80 (dids: string[]) => { subscribedCommitDids.push(...dids); },
81 ),
82 unsubscribeCommitTopics: vi.fn(),
83 publishIdentityNotification: vi.fn().mockImplementation(
84 async (did: string, peerId: string, addrs: string[]) => {
85 publishedIdentity.push({ did, peerId, multiaddrs: addrs, time: new Date().toISOString() });
86 },
87 ),
88 onIdentityNotification: vi.fn().mockImplementation(
89 (handler: IdentityHandler) => { identityHandlers.push(handler); },
90 ),
91 subscribeIdentityTopics: vi.fn().mockImplementation(
92 (dids: string[]) => { subscribedIdentityDids.push(...dids); },
93 ),
94 unsubscribeIdentityTopics: vi.fn().mockImplementation(
95 (dids: string[]) => { unsubscribedIdentityDids.push(...dids); },
96 ),
97 getPeerId: vi.fn().mockReturnValue("12D3KooWMockPeer"),
98 getMultiaddrs: vi.fn().mockImplementation(() => multiaddrs),
99 getConnectionCount: vi.fn().mockReturnValue(0),
100 getRemoteAddrs: vi.fn().mockImplementation((peerId: string) => remoteAddrsMap[peerId] ?? []),
101 provideForDid: vi.fn().mockResolvedValue(undefined),
102 findProvidersForDid: vi.fn().mockResolvedValue([]),
103 // Test helpers
104 _commitHandlers: commitHandlers,
105 _identityHandlers: identityHandlers,
106 _subscribedCommitDids: subscribedCommitDids,
107 _subscribedIdentityDids: subscribedIdentityDids,
108 _unsubscribedIdentityDids: unsubscribedIdentityDids,
109 _publishedIdentity: publishedIdentity,
110 _setMultiaddrs(addrs: string[]) { multiaddrs = addrs; },
111 _setRemoteAddrs(peerId: string, addrs: string[]) { remoteAddrsMap[peerId] = addrs; },
112 _simulateIdentityNotification(notification: IdentityNotification) {
113 for (const handler of identityHandlers) {
114 handler(notification);
115 }
116 },
117 };
118}
119
120function createMockBlockStore() {
121 return {
122 putBlock: vi.fn().mockResolvedValue(undefined),
123 getBlock: vi.fn().mockResolvedValue(null),
124 hasBlock: vi.fn().mockResolvedValue(false),
125 putBlocks: vi.fn().mockResolvedValue(undefined),
126 deleteBlock: vi.fn().mockResolvedValue(undefined),
127 };
128}
129
130// ============================================
131// IdentityNotification encoding + topic tests
132// ============================================
133
134describe("IdentityNotification encoding", () => {
135 it("CBOR encode/decode round-trip", () => {
136 const notification: IdentityNotification = {
137 did: "did:plc:abc123",
138 peerId: "12D3KooWTest",
139 multiaddrs: ["/ip4/127.0.0.1/tcp/4001", "/ip4/192.168.1.1/tcp/4001"],
140 time: "2024-01-01T00:00:00.000Z",
141 };
142
143 const encoded = cborEncode(notification);
144 expect(encoded).toBeInstanceOf(Uint8Array);
145 expect(encoded.length).toBeGreaterThan(0);
146
147 const decoded = cborDecode(encoded) as IdentityNotification;
148 expect(decoded.did).toBe(notification.did);
149 expect(decoded.peerId).toBe(notification.peerId);
150 expect(decoded.multiaddrs).toEqual(notification.multiaddrs);
151 expect(decoded.time).toBe(notification.time);
152 });
153
154 it("identityTopic() produces correct topic strings", () => {
155 const did = "did:plc:abc123";
156 const topic = identityTopic(did);
157 expect(topic).toBe("/p2pds/identity/1/did:plc:abc123");
158 expect(topic.startsWith(IDENTITY_TOPIC_PREFIX)).toBe(true);
159 });
160
161 it("different DIDs produce different identity topics", () => {
162 const topic1 = identityTopic("did:plc:aaa");
163 const topic2 = identityTopic("did:plc:bbb");
164 expect(topic1).not.toBe(topic2);
165 });
166});
167
168// ============================================
169// PeerID change detection
170// ============================================
171
172describe("PeerID change detection", () => {
173 let tmpDir: string;
174
175 beforeEach(() => {
176 tmpDir = mkdtempSync(join(tmpdir(), "peer-freshness-test-"));
177 });
178
179 afterEach(() => {
180 rmSync(tmpDir, { recursive: true, force: true });
181 });
182
183 it("logs warning when PeerID changes during syncDid()", async () => {
184 const db = new Database(join(tmpDir, "test.db"));
185 const config = testConfig(tmpDir, ["did:plc:remote1"]);
186
187 const { RepoManager } = await import("../repo-manager.js");
188 const { ReplicationManager } = await import("./replication-manager.js");
189 const { DidResolver } = await import("../did-resolver.js");
190
191 const repoManager = new RepoManager(db, config);
192 repoManager.init();
193
194 const mockNet = createMockNetworkService();
195 const mockBlocks = createMockBlockStore();
196 const didResolver = new DidResolver();
197
198 const manager = new ReplicationManager(
199 db, config, repoManager, mockBlocks, mockNet, didResolver,
200 );
201
202 try {
203 await manager.init();
204
205 // Seed sync state with an existing peerId
206 const storage = manager.getSyncStorage();
207 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" });
208 storage.updatePeerInfo("did:plc:remote1", "12D3KooWOldPeer", ["/ip4/1.2.3.4/tcp/4001"]);
209 // Set peerInfoFetchedAt to null so shouldRefreshPeerInfo returns true
210 db.prepare("UPDATE replication_state SET peer_info_fetched_at = NULL WHERE did = ?")
211 .run("did:plc:remote1");
212
213 // Mock peer discovery to return a new PeerID
214 const { PeerDiscovery } = await import("./peer-discovery.js");
215 vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({
216 pdsEndpoint: "https://pds.example.com",
217 peerId: "12D3KooWNewPeer",
218 multiaddrs: ["/ip4/5.6.7.8/tcp/4001"],
219 endpoint: null,
220 });
221
222 // Mock repo fetcher
223 const { RepoFetcher } = await import("./repo-fetcher.js");
224 vi.spyOn(RepoFetcher.prototype, "resolvePds").mockResolvedValue("https://pds.example.com");
225 vi.spyOn(RepoFetcher.prototype, "fetchRepo").mockRejectedValue(new Error("test skip"));
226
227 const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
228
229 try {
230 await manager.syncDid("did:plc:remote1");
231 } catch {
232 // Expected to fail since fetchRepo is mocked to reject
233 }
234
235 // Check that PeerID change warning was logged
236 const peerIdChangeWarning = warnSpy.mock.calls.find(
237 (call) => typeof call[0] === "string" && call[0].includes("PeerID changed"),
238 );
239 expect(peerIdChangeWarning).toBeDefined();
240 expect(peerIdChangeWarning![0]).toContain("12D3KooWOldPeer");
241 expect(peerIdChangeWarning![0]).toContain("12D3KooWNewPeer");
242
243 warnSpy.mockRestore();
244 vi.restoreAllMocks();
245 } finally {
246 manager.stop();
247 db.close();
248 }
249 });
250});
251
252// ============================================
253// refreshPeerInfoForEndpoint
254// ============================================
255
256describe("refreshPeerInfoForEndpoint", () => {
257 let tmpDir: string;
258
259 beforeEach(() => {
260 tmpDir = mkdtempSync(join(tmpdir(), "peer-refresh-test-"));
261 });
262
263 afterEach(() => {
264 rmSync(tmpDir, { recursive: true, force: true });
265 });
266
267 it("clears stale peer info and triggers re-discovery for matching DIDs", async () => {
268 const db = new Database(join(tmpDir, "test.db"));
269 const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]);
270
271 const { RepoManager } = await import("../repo-manager.js");
272 const { ReplicationManager } = await import("./replication-manager.js");
273 const { DidResolver } = await import("../did-resolver.js");
274 const { PeerDiscovery } = await import("./peer-discovery.js");
275
276 const repoManager = new RepoManager(db, config);
277 repoManager.init();
278
279 const mockNet = createMockNetworkService();
280 const mockBlocks = createMockBlockStore();
281 const didResolver = new DidResolver();
282
283 const manager = new ReplicationManager(
284 db, config, repoManager, mockBlocks, mockNet, didResolver,
285 );
286
287 try {
288 await manager.init();
289
290 const storage = manager.getSyncStorage();
291 // Set up two DIDs with different endpoints
292 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds1.example.com" });
293 storage.updatePeerInfo("did:plc:remote1", "12D3KooWPeer1", ["/ip4/1.2.3.4/tcp/4001"]);
294
295 storage.upsertState({ did: "did:plc:remote2", pdsEndpoint: "https://pds2.example.com" });
296 storage.updatePeerInfo("did:plc:remote2", "12D3KooWPeer2", ["/ip4/5.6.7.8/tcp/4001"]);
297
298 const discoverSpy = vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({
299 pdsEndpoint: "https://pds1.example.com",
300 peerId: "12D3KooWNewPeer1",
301 multiaddrs: ["/ip4/10.0.0.1/tcp/4001"],
302 endpoint: null,
303 });
304
305 // Refresh for pds1 endpoint only
306 manager.refreshPeerInfoForEndpoint("https://pds1.example.com");
307
308 // Wait for async discovery to complete
309 await new Promise((r) => setTimeout(r, 200));
310
311 // discoverPeer should only have been called for remote1
312 expect(discoverSpy).toHaveBeenCalledWith("did:plc:remote1");
313 expect(discoverSpy).not.toHaveBeenCalledWith("did:plc:remote2");
314
315 // remote1 should have new peer info
316 const state1 = storage.getState("did:plc:remote1");
317 expect(state1!.peerId).toBe("12D3KooWNewPeer1");
318
319 // remote2 should be untouched
320 const state2 = storage.getState("did:plc:remote2");
321 expect(state2!.peerId).toBe("12D3KooWPeer2");
322
323 discoverSpy.mockRestore();
324 } finally {
325 manager.stop();
326 db.close();
327 }
328 });
329});
330
331// ============================================
332// getRemoteAddrs
333// ============================================
334
335describe("getRemoteAddrs", () => {
336 it("returns empty array for unknown peer", () => {
337 const mockNet = createMockNetworkService();
338 const result = mockNet.getRemoteAddrs("12D3KooWUnknown");
339 expect(result).toEqual([]);
340 });
341
342 it("returns configured addrs for known peer", () => {
343 const mockNet = createMockNetworkService();
344 mockNet._setRemoteAddrs("12D3KooWKnown", ["/ip4/1.2.3.4/tcp/5555"]);
345 const result = mockNet.getRemoteAddrs("12D3KooWKnown");
346 expect(result).toEqual(["/ip4/1.2.3.4/tcp/5555"]);
347 });
348});
349
350// ============================================
351// Observed addrs merged into stored addrs
352// ============================================
353
354describe("observed addr merging", () => {
355 let tmpDir: string;
356
357 beforeEach(() => {
358 tmpDir = mkdtempSync(join(tmpdir(), "peer-merge-test-"));
359 });
360
361 afterEach(() => {
362 rmSync(tmpDir, { recursive: true, force: true });
363 });
364
365 it("merges observed addrs into stored addrs during syncDid", async () => {
366 const db = new Database(join(tmpDir, "test.db"));
367 const config = testConfig(tmpDir, ["did:plc:remote1"]);
368
369 const { RepoManager } = await import("../repo-manager.js");
370 const { ReplicationManager } = await import("./replication-manager.js");
371 const { DidResolver } = await import("../did-resolver.js");
372 const { PeerDiscovery } = await import("./peer-discovery.js");
373 const { RepoFetcher } = await import("./repo-fetcher.js");
374
375 const repoManager = new RepoManager(db, config);
376 repoManager.init();
377
378 const mockNet = createMockNetworkService();
379 const mockBlocks = createMockBlockStore();
380 const didResolver = new DidResolver();
381
382 const manager = new ReplicationManager(
383 db, config, repoManager, mockBlocks, mockNet, didResolver,
384 );
385
386 try {
387 await manager.init();
388
389 const storage = manager.getSyncStorage();
390 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" });
391 // Clear peer_info_fetched_at so refresh triggers
392 db.prepare("UPDATE replication_state SET peer_info_fetched_at = NULL WHERE did = ?")
393 .run("did:plc:remote1");
394
395 // Discovery returns peerId + one addr
396 vi.spyOn(PeerDiscovery.prototype, "discoverPeer").mockResolvedValue({
397 pdsEndpoint: "https://pds.example.com",
398 peerId: "12D3KooWPeerX",
399 multiaddrs: ["/ip4/1.2.3.4/tcp/4001"],
400 endpoint: null,
401 });
402
403 // Active connection has an additional observed addr
404 mockNet._setRemoteAddrs("12D3KooWPeerX", ["/ip4/5.6.7.8/tcp/9999"]);
405
406 // Mock fetchRepo to reject so we can inspect state after peer discovery
407 vi.spyOn(RepoFetcher.prototype, "resolvePds").mockResolvedValue("https://pds.example.com");
408 vi.spyOn(RepoFetcher.prototype, "fetchRepo").mockRejectedValue(new Error("test skip"));
409
410 // Spy on updatePeerInfo to capture the merged call
411 const updateSpy = vi.spyOn(storage, "updatePeerInfo");
412
413 try {
414 await manager.syncDid("did:plc:remote1");
415 } catch {
416 // Expected
417 }
418
419 // updatePeerInfo should have been called with the merged addrs
420 // (before clearPeerInfo in the error handler wipes them)
421 const mergedCall = updateSpy.mock.calls.find(
422 (call) => call[2] && (call[2] as string[]).length === 2,
423 );
424 expect(mergedCall).toBeDefined();
425 expect(mergedCall![2]).toContain("/ip4/1.2.3.4/tcp/4001");
426 expect(mergedCall![2]).toContain("/ip4/5.6.7.8/tcp/9999");
427
428 vi.restoreAllMocks();
429 } finally {
430 manager.stop();
431 db.close();
432 }
433 });
434});
435
436// ============================================
437// Identity notification integration
438// ============================================
439
440describe("Identity notification integration", () => {
441 let tmpDir: string;
442
443 beforeEach(() => {
444 tmpDir = mkdtempSync(join(tmpdir(), "peer-identity-test-"));
445 });
446
447 afterEach(() => {
448 rmSync(tmpDir, { recursive: true, force: true });
449 });
450
451 it("init() subscribes to identity topics for tracked DIDs", async () => {
452 const db = new Database(join(tmpDir, "test.db"));
453 const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]);
454
455 const { RepoManager } = await import("../repo-manager.js");
456 const { ReplicationManager } = await import("./replication-manager.js");
457 const { DidResolver } = await import("../did-resolver.js");
458
459 const repoManager = new RepoManager(db, config);
460 repoManager.init();
461
462 const mockNet = createMockNetworkService();
463 const mockBlocks = createMockBlockStore();
464 const didResolver = new DidResolver();
465
466 const manager = new ReplicationManager(
467 db, config, repoManager, mockBlocks, mockNet, didResolver,
468 );
469
470 try {
471 await manager.init();
472
473 expect(mockNet.subscribeIdentityTopics).toHaveBeenCalled();
474 expect(mockNet._subscribedIdentityDids).toContain("did:plc:remote1");
475 expect(mockNet._subscribedIdentityDids).toContain("did:plc:remote2");
476
477 expect(mockNet.onIdentityNotification).toHaveBeenCalled();
478 expect(mockNet._identityHandlers.length).toBeGreaterThan(0);
479 } finally {
480 manager.stop();
481 db.close();
482 }
483 });
484
485 it("identity notification updates peer info immediately", async () => {
486 const db = new Database(join(tmpDir, "test-update.db"));
487 const config = testConfig(tmpDir, ["did:plc:remote1"]);
488
489 const { RepoManager } = await import("../repo-manager.js");
490 const { ReplicationManager } = await import("./replication-manager.js");
491 const { DidResolver } = await import("../did-resolver.js");
492
493 const repoManager = new RepoManager(db, config);
494 repoManager.init();
495
496 const mockNet = createMockNetworkService();
497 const mockBlocks = createMockBlockStore();
498 const didResolver = new DidResolver();
499
500 const manager = new ReplicationManager(
501 db, config, repoManager, mockBlocks, mockNet, didResolver,
502 );
503
504 try {
505 await manager.init();
506
507 const storage = manager.getSyncStorage();
508 storage.upsertState({ did: "did:plc:remote1", pdsEndpoint: "https://pds.example.com" });
509 storage.updatePeerInfo("did:plc:remote1", "12D3KooWOldPeer", ["/ip4/1.2.3.4/tcp/4001"]);
510
511 // Simulate identity notification
512 mockNet._simulateIdentityNotification({
513 did: "did:plc:remote1",
514 peerId: "12D3KooWNewPeer",
515 multiaddrs: ["/ip4/10.0.0.1/tcp/5001", "/ip4/10.0.0.2/tcp/5002"],
516 time: new Date().toISOString(),
517 });
518
519 // Peer info should be updated immediately
520 const state = storage.getState("did:plc:remote1");
521 expect(state!.peerId).toBe("12D3KooWNewPeer");
522 expect(state!.peerMultiaddrs).toEqual(["/ip4/10.0.0.1/tcp/5001", "/ip4/10.0.0.2/tcp/5002"]);
523 } finally {
524 manager.stop();
525 db.close();
526 }
527 });
528
529 it("identity notification for untracked DID is ignored", async () => {
530 const db = new Database(join(tmpDir, "test-ignore.db"));
531 const config = testConfig(tmpDir, ["did:plc:remote1"]);
532
533 const { RepoManager } = await import("../repo-manager.js");
534 const { ReplicationManager } = await import("./replication-manager.js");
535 const { DidResolver } = await import("../did-resolver.js");
536
537 const repoManager = new RepoManager(db, config);
538 repoManager.init();
539
540 const mockNet = createMockNetworkService();
541 const mockBlocks = createMockBlockStore();
542 const didResolver = new DidResolver();
543
544 const manager = new ReplicationManager(
545 db, config, repoManager, mockBlocks, mockNet, didResolver,
546 );
547
548 try {
549 await manager.init();
550
551 const storage = manager.getSyncStorage();
552
553 // Simulate identity notification for untracked DID
554 mockNet._simulateIdentityNotification({
555 did: "did:plc:unknown",
556 peerId: "12D3KooWNewPeer",
557 multiaddrs: ["/ip4/10.0.0.1/tcp/5001"],
558 time: new Date().toISOString(),
559 });
560
561 // No state should exist for untracked DID
562 const state = storage.getState("did:plc:unknown");
563 expect(state).toBeNull();
564 } finally {
565 manager.stop();
566 db.close();
567 }
568 });
569
570 it("stop() unsubscribes from identity topics", async () => {
571 const db = new Database(join(tmpDir, "test-unsub.db"));
572 const config = testConfig(tmpDir, ["did:plc:remote1"]);
573
574 const { RepoManager } = await import("../repo-manager.js");
575 const { ReplicationManager } = await import("./replication-manager.js");
576 const { DidResolver } = await import("../did-resolver.js");
577
578 const repoManager = new RepoManager(db, config);
579 repoManager.init();
580
581 const mockNet = createMockNetworkService();
582 const mockBlocks = createMockBlockStore();
583 const didResolver = new DidResolver();
584
585 const manager = new ReplicationManager(
586 db, config, repoManager, mockBlocks, mockNet, didResolver,
587 );
588
589 try {
590 await manager.init();
591 manager.stop();
592
593 expect(mockNet.unsubscribeIdentityTopics).toHaveBeenCalled();
594 } finally {
595 db.close();
596 }
597 });
598
599});