atproto user agency toolkit for individuals and groups
1/**
2 * Gossipsub commit notification tests.
3 *
4 * Tests CBOR encoding/decoding, topic generation, E2E gossipsub
5 * between two Helia nodes, and ReplicationManager integration.
6 */
7
8import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
9import { mkdtempSync, rmSync } from "node:fs";
10import { tmpdir } from "node:os";
11import { join } from "node:path";
12import Database from "better-sqlite3";
13import type { Helia } from "@helia/interface";
14import { SqliteBlockstore } from "../sqlite-blockstore.js";
15import { SqliteDatastore } from "../sqlite-datastore.js";
16import {
17 encode as cborEncode,
18 decode as cborDecode,
19} from "../cbor-compat.js";
20import {
21 commitTopic,
22 COMMIT_TOPIC_PREFIX,
23 type CommitNotification,
24} from "../ipfs.js";
25
26// ============================================
27// Message encoding + topic tests
28// ============================================
29
30describe("CommitNotification encoding", () => {
31 it("CBOR encode/decode round-trip", () => {
32 const notification: CommitNotification = {
33 did: "did:plc:abc123",
34 commit: "bafyreiabc",
35 rev: "3jui7kd2xxxx2",
36 time: "2024-01-01T00:00:00.000Z",
37 peer: "12D3KooWTest",
38 };
39
40 const encoded = cborEncode(notification);
41 expect(encoded).toBeInstanceOf(Uint8Array);
42 expect(encoded.length).toBeGreaterThan(0);
43
44 const decoded = cborDecode(encoded) as CommitNotification;
45 expect(decoded.did).toBe(notification.did);
46 expect(decoded.commit).toBe(notification.commit);
47 expect(decoded.rev).toBe(notification.rev);
48 expect(decoded.time).toBe(notification.time);
49 expect(decoded.peer).toBe(notification.peer);
50 });
51
52 it("topic generation from DID", () => {
53 const did = "did:plc:abc123";
54 const topic = commitTopic(did);
55 expect(topic).toBe("/p2pds/commits/1/did:plc:abc123");
56 expect(topic.startsWith(COMMIT_TOPIC_PREFIX)).toBe(true);
57 });
58
59 it("different DIDs produce different topics", () => {
60 const topic1 = commitTopic("did:plc:aaa");
61 const topic2 = commitTopic("did:plc:bbb");
62 expect(topic1).not.toBe(topic2);
63 });
64});
65
66// ============================================
67// E2E gossipsub test (two Helia nodes)
68// ============================================
69
70describe("E2E gossipsub: two Helia nodes", () => {
71 let tmpDir: string;
72 let nodeA: Helia | null = null;
73 let nodeB: Helia | null = null;
74 let dbA: Database.Database | null = null;
75 let dbB: Database.Database | null = null;
76
77 beforeEach(() => {
78 tmpDir = mkdtempSync(join(tmpdir(), "gossipsub-e2e-test-"));
79 });
80
81 afterEach(async () => {
82 const stops: Promise<void>[] = [];
83 if (nodeA) stops.push(nodeA.stop().catch(() => {}));
84 if (nodeB) stops.push(nodeB.stop().catch(() => {}));
85 await Promise.all(stops);
86 nodeA = null;
87 nodeB = null;
88
89 if (dbA) { dbA.close(); dbA = null; }
90 if (dbB) { dbB.close(); dbB = null; }
91
92 rmSync(tmpDir, { recursive: true, force: true });
93 });
94
95 /**
96 * Create a minimal Helia node with TCP + gossipsub for testing.
97 * Strips out all discovery, relay, etc. — just TCP + noise + yamux + identify + gossipsub.
98 */
99 async function createGossipsubTestNode(
100 db: Database.Database,
101 ): Promise<Helia> {
102 const { createHelia } = await import("helia");
103 const { noise } = await import("@chainsafe/libp2p-noise");
104 const { yamux } = await import("@chainsafe/libp2p-yamux");
105 const { tcp } = await import("@libp2p/tcp");
106 const { identify } = await import("@libp2p/identify");
107 const { gossipsub } = await import("@libp2p/gossipsub");
108 const { createLibp2p } = await import("libp2p");
109
110 const blockstore = new SqliteBlockstore(db);
111 const datastore = new SqliteDatastore(db);
112
113 const libp2p = await createLibp2p({
114 addresses: {
115 listen: ["/ip4/127.0.0.1/tcp/0"],
116 },
117 transports: [tcp()],
118 connectionEncrypters: [noise()],
119 streamMuxers: [yamux()],
120 services: {
121 identify: identify(),
122 pubsub: gossipsub({
123 emitSelf: false,
124 allowPublishToZeroTopicPeers: true,
125 }),
126 },
127 });
128
129 return createHelia({
130 libp2p,
131 blockstore: blockstore as any,
132 datastore: datastore as any,
133 });
134 }
135
136 it("notification published by one node is received by connected peer", { timeout: 60_000 }, async () => {
137 dbA = new Database(join(tmpDir, "node-a.db"));
138 dbB = new Database(join(tmpDir, "node-b.db"));
139 nodeA = await createGossipsubTestNode(dbA);
140 nodeB = await createGossipsubTestNode(dbB);
141
142 // Connect the nodes
143 const addrsA = nodeA.libp2p.getMultiaddrs();
144 expect(addrsA.length).toBeGreaterThan(0);
145 await nodeB.libp2p.dial(addrsA[0]!);
146
147 // Wait for connection
148 await waitFor(() =>
149 nodeA!.libp2p.getConnections().length > 0 &&
150 nodeB!.libp2p.getConnections().length > 0,
151 5_000,
152 );
153
154 const testDid = "did:plc:gossiptest123";
155 const topic = commitTopic(testDid);
156
157 // Access pubsub service
158 const pubsubA = (nodeA.libp2p.services as Record<string, unknown>).pubsub as {
159 subscribe(topic: string): void;
160 addEventListener(event: string, handler: (evt: unknown) => void): void;
161 };
162 const pubsubB = (nodeB.libp2p.services as Record<string, unknown>).pubsub as {
163 subscribe(topic: string): void;
164 publish(topic: string, data: Uint8Array): Promise<unknown>;
165 };
166
167 // Both nodes subscribe (needed for mesh formation)
168 const received: CommitNotification[] = [];
169 pubsubA.subscribe(topic);
170 pubsubB.subscribe(topic);
171
172 pubsubA.addEventListener("message", (evt: unknown) => {
173 try {
174 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail;
175 if (detail.topic === topic) {
176 const notification = cborDecode(detail.data) as CommitNotification;
177 received.push(notification);
178 }
179 } catch {
180 // ignore decode errors in test
181 }
182 });
183
184 // Wait for gossipsub mesh to form, then publish repeatedly until received.
185 // Gossipsub mesh formation requires multiple heartbeat cycles (~1s each).
186 // We publish every 2s until the message gets through.
187 const notification: CommitNotification = {
188 did: testDid,
189 commit: "bafyreiabc",
190 rev: "3jui7kd2xxxx2",
191 time: new Date().toISOString(),
192 peer: nodeB.libp2p.peerId.toString(),
193 };
194 const data = cborEncode(notification);
195
196 await waitFor(async () => {
197 if (received.length > 0) return true;
198 await pubsubB.publish(topic, data).catch(() => {});
199 await new Promise((r) => setTimeout(r, 1000));
200 return received.length > 0;
201 }, 30_000, 500);
202
203 expect(received.length).toBe(1);
204 expect(received[0]!.did).toBe(testDid);
205 expect(received[0]!.commit).toBe("bafyreiabc");
206 expect(received[0]!.rev).toBe("3jui7kd2xxxx2");
207 expect(received[0]!.peer).toBe(nodeB.libp2p.peerId.toString());
208 expect(typeof received[0]!.time).toBe("string");
209 });
210});
211
212// ============================================
213// ReplicationManager integration (mock NetworkService)
214// ============================================
215
216describe("ReplicationManager gossipsub integration", () => {
217 let tmpDir: string;
218
219 beforeEach(() => {
220 tmpDir = mkdtempSync(join(tmpdir(), "repl-gossipsub-test-"));
221 });
222
223 afterEach(() => {
224 rmSync(tmpDir, { recursive: true, force: true });
225 });
226
227 function createMockNetworkService() {
228 const handlers: Array<(n: CommitNotification) => void | Promise<void>> = [];
229 const subscribedDids: string[] = [];
230 const publishedNotifications: Array<{ did: string; commitCid: string; rev: string }> = [];
231
232 return {
233 provideBlocks: vi.fn().mockResolvedValue(undefined),
234 publishCommitNotification: vi.fn().mockImplementation(
235 async (did: string, commitCid: string, rev: string) => {
236 publishedNotifications.push({ did, commitCid, rev });
237 },
238 ),
239 onCommitNotification: vi.fn().mockImplementation(
240 (handler: (n: CommitNotification) => void | Promise<void>) => {
241 handlers.push(handler);
242 },
243 ),
244 subscribeCommitTopics: vi.fn().mockImplementation(
245 (dids: string[]) => {
246 subscribedDids.push(...dids);
247 },
248 ),
249 unsubscribeCommitTopics: vi.fn(),
250 getPeerId: vi.fn().mockReturnValue("12D3KooWMockPeer"),
251 getMultiaddrs: vi.fn().mockReturnValue(["/ip4/127.0.0.1/tcp/4001"]),
252 getConnectionCount: vi.fn().mockReturnValue(0),
253 getRemoteAddrs: vi.fn().mockReturnValue([]),
254 publishIdentityNotification: vi.fn().mockResolvedValue(undefined),
255 onIdentityNotification: vi.fn(),
256 subscribeIdentityTopics: vi.fn(),
257 unsubscribeIdentityTopics: vi.fn(),
258 provideForDid: vi.fn().mockResolvedValue(undefined),
259 findProvidersForDid: vi.fn().mockResolvedValue([]),
260 // Test helpers
261 _handlers: handlers,
262 _subscribedDids: subscribedDids,
263 _publishedNotifications: publishedNotifications,
264 _simulateNotification(notification: CommitNotification) {
265 for (const handler of handlers) {
266 handler(notification);
267 }
268 },
269 };
270 }
271
272 function createMockBlockStore() {
273 return {
274 putBlock: vi.fn().mockResolvedValue(undefined),
275 getBlock: vi.fn().mockResolvedValue(null),
276 hasBlock: vi.fn().mockResolvedValue(false),
277 putBlocks: vi.fn().mockResolvedValue(undefined),
278 deleteBlock: vi.fn().mockResolvedValue(undefined),
279 };
280 }
281
282 it("init() subscribes to commit topics for tracked DIDs", async () => {
283 const Database = (await import("better-sqlite3")).default;
284 const { ReplicationManager } = await import("./replication-manager.js");
285 const { DidResolver } = await import("../did-resolver.js");
286
287 const db = new Database(join(tmpDir, "test.db"));
288 const config = {
289 DID: "did:plc:local",
290 HANDLE: "test.example.com",
291 PDS_HOSTNAME: "test.example.com",
292 AUTH_TOKEN: "test",
293 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001",
294 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
295 JWT_SECRET: "test",
296 PASSWORD_HASH: "$2a$10$test",
297 DATA_DIR: tmpDir,
298 PORT: 3000,
299 IPFS_ENABLED: true,
300 IPFS_NETWORKING: false,
301 REPLICATE_DIDS: ["did:plc:remote1", "did:plc:remote2"],
302 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
303 FIREHOSE_ENABLED: false,
304 RATE_LIMIT_ENABLED: false,
305 RATE_LIMIT_READ_PER_MIN: 300,
306 RATE_LIMIT_SYNC_PER_MIN: 30,
307 RATE_LIMIT_SESSION_PER_MIN: 10,
308 RATE_LIMIT_WRITE_PER_MIN: 200,
309 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
310 RATE_LIMIT_MAX_CONNECTIONS: 100,
311 RATE_LIMIT_FIREHOSE_PER_IP: 3,
312 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
313 };
314
315 const { RepoManager } = await import("../repo-manager.js");
316 const repoManager = new RepoManager(db, config);
317 repoManager.init();
318
319 const mockNet = createMockNetworkService();
320 const mockBlocks = createMockBlockStore();
321 const didResolver = new DidResolver();
322
323 const manager = new ReplicationManager(
324 db,
325 config,
326 repoManager,
327 mockBlocks,
328 mockNet,
329 didResolver,
330 );
331
332 try {
333 await manager.init();
334
335 // Verify subscribeCommitTopics was called with tracked DIDs
336 expect(mockNet.subscribeCommitTopics).toHaveBeenCalled();
337 expect(mockNet._subscribedDids).toContain("did:plc:remote1");
338 expect(mockNet._subscribedDids).toContain("did:plc:remote2");
339
340 // Verify onCommitNotification was called to register a handler
341 expect(mockNet.onCommitNotification).toHaveBeenCalled();
342 expect(mockNet._handlers.length).toBeGreaterThan(0);
343 } finally {
344 manager.stop();
345 db.close();
346 }
347 });
348
349 it("dedup: same rev notification does not trigger re-sync", async () => {
350 const Database = (await import("better-sqlite3")).default;
351 const { ReplicationManager } = await import("./replication-manager.js");
352 const { DidResolver } = await import("../did-resolver.js");
353
354 const db = new Database(join(tmpDir, "test-dedup.db"));
355 const config = {
356 DID: "did:plc:local",
357 HANDLE: "test.example.com",
358 PDS_HOSTNAME: "test.example.com",
359 AUTH_TOKEN: "test",
360 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001",
361 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
362 JWT_SECRET: "test",
363 PASSWORD_HASH: "$2a$10$test",
364 DATA_DIR: tmpDir,
365 PORT: 3000,
366 IPFS_ENABLED: true,
367 IPFS_NETWORKING: false,
368 REPLICATE_DIDS: ["did:plc:remote1"],
369 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
370 FIREHOSE_ENABLED: false,
371 RATE_LIMIT_ENABLED: false,
372 RATE_LIMIT_READ_PER_MIN: 300,
373 RATE_LIMIT_SYNC_PER_MIN: 30,
374 RATE_LIMIT_SESSION_PER_MIN: 10,
375 RATE_LIMIT_WRITE_PER_MIN: 200,
376 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
377 RATE_LIMIT_MAX_CONNECTIONS: 100,
378 RATE_LIMIT_FIREHOSE_PER_IP: 3,
379 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
380 };
381
382 const { RepoManager } = await import("../repo-manager.js");
383 const repoManager = new RepoManager(db, config);
384 repoManager.init();
385
386 const mockNet = createMockNetworkService();
387 const mockBlocks = createMockBlockStore();
388 const didResolver = new DidResolver();
389
390 const manager = new ReplicationManager(
391 db,
392 config,
393 repoManager,
394 mockBlocks,
395 mockNet,
396 didResolver,
397 );
398
399 try {
400 await manager.init();
401
402 // Spy on syncDid to see if it gets called
403 const syncDidSpy = vi.spyOn(manager, "syncDid").mockResolvedValue(undefined);
404
405 const notification: CommitNotification = {
406 did: "did:plc:remote1",
407 commit: "bafyreiabc",
408 rev: "3jui7kd2xxxx2",
409 time: new Date().toISOString(),
410 peer: "12D3KooWOtherPeer",
411 };
412
413 // First notification should trigger syncDid
414 mockNet._simulateNotification(notification);
415 await new Promise((r) => setTimeout(r, 100));
416 expect(syncDidSpy).toHaveBeenCalledTimes(1);
417
418 // Same rev notification should be deduped
419 mockNet._simulateNotification(notification);
420 await new Promise((r) => setTimeout(r, 100));
421 expect(syncDidSpy).toHaveBeenCalledTimes(1); // still 1
422
423 // Different rev should trigger another syncDid
424 mockNet._simulateNotification({
425 ...notification,
426 rev: "3jui7kd2yyyy3",
427 });
428 await new Promise((r) => setTimeout(r, 100));
429 expect(syncDidSpy).toHaveBeenCalledTimes(2);
430
431 syncDidSpy.mockRestore();
432 } finally {
433 manager.stop();
434 db.close();
435 }
436 });
437
438 it("notification for untracked DID is ignored", async () => {
439 const Database = (await import("better-sqlite3")).default;
440 const { ReplicationManager } = await import("./replication-manager.js");
441 const { DidResolver } = await import("../did-resolver.js");
442
443 const db = new Database(join(tmpDir, "test-untracked.db"));
444 const config = {
445 DID: "did:plc:local",
446 HANDLE: "test.example.com",
447 PDS_HOSTNAME: "test.example.com",
448 AUTH_TOKEN: "test",
449 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001",
450 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
451 JWT_SECRET: "test",
452 PASSWORD_HASH: "$2a$10$test",
453 DATA_DIR: tmpDir,
454 PORT: 3000,
455 IPFS_ENABLED: true,
456 IPFS_NETWORKING: false,
457 REPLICATE_DIDS: ["did:plc:remote1"],
458 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
459 FIREHOSE_ENABLED: false,
460 RATE_LIMIT_ENABLED: false,
461 RATE_LIMIT_READ_PER_MIN: 300,
462 RATE_LIMIT_SYNC_PER_MIN: 30,
463 RATE_LIMIT_SESSION_PER_MIN: 10,
464 RATE_LIMIT_WRITE_PER_MIN: 200,
465 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
466 RATE_LIMIT_MAX_CONNECTIONS: 100,
467 RATE_LIMIT_FIREHOSE_PER_IP: 3,
468 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
469 };
470
471 const { RepoManager } = await import("../repo-manager.js");
472 const repoManager = new RepoManager(db, config);
473 repoManager.init();
474
475 const mockNet = createMockNetworkService();
476 const mockBlocks = createMockBlockStore();
477 const didResolver = new DidResolver();
478
479 const manager = new ReplicationManager(
480 db,
481 config,
482 repoManager,
483 mockBlocks,
484 mockNet,
485 didResolver,
486 );
487
488 try {
489 await manager.init();
490
491 const syncDidSpy = vi.spyOn(manager, "syncDid").mockResolvedValue(undefined);
492
493 // Notification for a DID we're NOT tracking
494 mockNet._simulateNotification({
495 did: "did:plc:unknown",
496 commit: "bafyreiabc",
497 rev: "3jui7kd2xxxx2",
498 time: new Date().toISOString(),
499 peer: "12D3KooWOther",
500 });
501 await new Promise((r) => setTimeout(r, 100));
502
503 expect(syncDidSpy).not.toHaveBeenCalled();
504
505 syncDidSpy.mockRestore();
506 } finally {
507 manager.stop();
508 db.close();
509 }
510 });
511});
512
513// ============================================
514// Helpers
515// ============================================
516
517async function waitFor(
518 fn: () => Promise<boolean> | boolean,
519 timeoutMs: number = 10_000,
520 intervalMs: number = 200,
521): Promise<void> {
522 const deadline = Date.now() + timeoutMs;
523 while (Date.now() < deadline) {
524 if (await fn()) return;
525 await new Promise((r) => setTimeout(r, intervalMs));
526 }
527 throw new Error(`waitFor timed out after ${timeoutMs}ms`);
528}