atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 528 lines 16 kB view raw
1/** 2 * Gossipsub commit notification tests. 3 * 4 * Tests CBOR encoding/decoding, topic generation, E2E gossipsub 5 * between two Helia nodes, and ReplicationManager integration. 6 */ 7 8import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 9import { mkdtempSync, rmSync } from "node:fs"; 10import { tmpdir } from "node:os"; 11import { join } from "node:path"; 12import Database from "better-sqlite3"; 13import type { Helia } from "@helia/interface"; 14import { SqliteBlockstore } from "../sqlite-blockstore.js"; 15import { SqliteDatastore } from "../sqlite-datastore.js"; 16import { 17 encode as cborEncode, 18 decode as cborDecode, 19} from "../cbor-compat.js"; 20import { 21 commitTopic, 22 COMMIT_TOPIC_PREFIX, 23 type CommitNotification, 24} from "../ipfs.js"; 25 26// ============================================ 27// Message encoding + topic tests 28// ============================================ 29 30describe("CommitNotification encoding", () => { 31 it("CBOR encode/decode round-trip", () => { 32 const notification: CommitNotification = { 33 did: "did:plc:abc123", 34 commit: "bafyreiabc", 35 rev: "3jui7kd2xxxx2", 36 time: "2024-01-01T00:00:00.000Z", 37 peer: "12D3KooWTest", 38 }; 39 40 const encoded = cborEncode(notification); 41 expect(encoded).toBeInstanceOf(Uint8Array); 42 expect(encoded.length).toBeGreaterThan(0); 43 44 const decoded = cborDecode(encoded) as CommitNotification; 45 expect(decoded.did).toBe(notification.did); 46 expect(decoded.commit).toBe(notification.commit); 47 expect(decoded.rev).toBe(notification.rev); 48 expect(decoded.time).toBe(notification.time); 49 expect(decoded.peer).toBe(notification.peer); 50 }); 51 52 it("topic generation from DID", () => { 53 const did = "did:plc:abc123"; 54 const topic = commitTopic(did); 55 expect(topic).toBe("/p2pds/commits/1/did:plc:abc123"); 56 expect(topic.startsWith(COMMIT_TOPIC_PREFIX)).toBe(true); 57 }); 58 59 it("different DIDs produce different topics", () => { 60 const topic1 = commitTopic("did:plc:aaa"); 61 const topic2 = commitTopic("did:plc:bbb"); 62 expect(topic1).not.toBe(topic2); 63 }); 64}); 65 66// ============================================ 67// E2E gossipsub test (two Helia nodes) 68// ============================================ 69 70describe("E2E gossipsub: two Helia nodes", () => { 71 let tmpDir: string; 72 let nodeA: Helia | null = null; 73 let nodeB: Helia | null = null; 74 let dbA: Database.Database | null = null; 75 let dbB: Database.Database | null = null; 76 77 beforeEach(() => { 78 tmpDir = mkdtempSync(join(tmpdir(), "gossipsub-e2e-test-")); 79 }); 80 81 afterEach(async () => { 82 const stops: Promise<void>[] = []; 83 if (nodeA) stops.push(nodeA.stop().catch(() => {})); 84 if (nodeB) stops.push(nodeB.stop().catch(() => {})); 85 await Promise.all(stops); 86 nodeA = null; 87 nodeB = null; 88 89 if (dbA) { dbA.close(); dbA = null; } 90 if (dbB) { dbB.close(); dbB = null; } 91 92 rmSync(tmpDir, { recursive: true, force: true }); 93 }); 94 95 /** 96 * Create a minimal Helia node with TCP + gossipsub for testing. 97 * Strips out all discovery, relay, etc. — just TCP + noise + yamux + identify + gossipsub. 98 */ 99 async function createGossipsubTestNode( 100 db: Database.Database, 101 ): Promise<Helia> { 102 const { createHelia } = await import("helia"); 103 const { noise } = await import("@chainsafe/libp2p-noise"); 104 const { yamux } = await import("@chainsafe/libp2p-yamux"); 105 const { tcp } = await import("@libp2p/tcp"); 106 const { identify } = await import("@libp2p/identify"); 107 const { gossipsub } = await import("@libp2p/gossipsub"); 108 const { createLibp2p } = await import("libp2p"); 109 110 const blockstore = new SqliteBlockstore(db); 111 const datastore = new SqliteDatastore(db); 112 113 const libp2p = await createLibp2p({ 114 addresses: { 115 listen: ["/ip4/127.0.0.1/tcp/0"], 116 }, 117 transports: [tcp()], 118 connectionEncrypters: [noise()], 119 streamMuxers: [yamux()], 120 services: { 121 identify: identify(), 122 pubsub: gossipsub({ 123 emitSelf: false, 124 allowPublishToZeroTopicPeers: true, 125 }), 126 }, 127 }); 128 129 return createHelia({ 130 libp2p, 131 blockstore: blockstore as any, 132 datastore: datastore as any, 133 }); 134 } 135 136 it("notification published by one node is received by connected peer", { timeout: 60_000 }, async () => { 137 dbA = new Database(join(tmpDir, "node-a.db")); 138 dbB = new Database(join(tmpDir, "node-b.db")); 139 nodeA = await createGossipsubTestNode(dbA); 140 nodeB = await createGossipsubTestNode(dbB); 141 142 // Connect the nodes 143 const addrsA = nodeA.libp2p.getMultiaddrs(); 144 expect(addrsA.length).toBeGreaterThan(0); 145 await nodeB.libp2p.dial(addrsA[0]!); 146 147 // Wait for connection 148 await waitFor(() => 149 nodeA!.libp2p.getConnections().length > 0 && 150 nodeB!.libp2p.getConnections().length > 0, 151 5_000, 152 ); 153 154 const testDid = "did:plc:gossiptest123"; 155 const topic = commitTopic(testDid); 156 157 // Access pubsub service 158 const pubsubA = (nodeA.libp2p.services as Record<string, unknown>).pubsub as { 159 subscribe(topic: string): void; 160 addEventListener(event: string, handler: (evt: unknown) => void): void; 161 }; 162 const pubsubB = (nodeB.libp2p.services as Record<string, unknown>).pubsub as { 163 subscribe(topic: string): void; 164 publish(topic: string, data: Uint8Array): Promise<unknown>; 165 }; 166 167 // Both nodes subscribe (needed for mesh formation) 168 const received: CommitNotification[] = []; 169 pubsubA.subscribe(topic); 170 pubsubB.subscribe(topic); 171 172 pubsubA.addEventListener("message", (evt: unknown) => { 173 try { 174 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; 175 if (detail.topic === topic) { 176 const notification = cborDecode(detail.data) as CommitNotification; 177 received.push(notification); 178 } 179 } catch { 180 // ignore decode errors in test 181 } 182 }); 183 184 // Wait for gossipsub mesh to form, then publish repeatedly until received. 185 // Gossipsub mesh formation requires multiple heartbeat cycles (~1s each). 186 // We publish every 2s until the message gets through. 187 const notification: CommitNotification = { 188 did: testDid, 189 commit: "bafyreiabc", 190 rev: "3jui7kd2xxxx2", 191 time: new Date().toISOString(), 192 peer: nodeB.libp2p.peerId.toString(), 193 }; 194 const data = cborEncode(notification); 195 196 await waitFor(async () => { 197 if (received.length > 0) return true; 198 await pubsubB.publish(topic, data).catch(() => {}); 199 await new Promise((r) => setTimeout(r, 1000)); 200 return received.length > 0; 201 }, 30_000, 500); 202 203 expect(received.length).toBe(1); 204 expect(received[0]!.did).toBe(testDid); 205 expect(received[0]!.commit).toBe("bafyreiabc"); 206 expect(received[0]!.rev).toBe("3jui7kd2xxxx2"); 207 expect(received[0]!.peer).toBe(nodeB.libp2p.peerId.toString()); 208 expect(typeof received[0]!.time).toBe("string"); 209 }); 210}); 211 212// ============================================ 213// ReplicationManager integration (mock NetworkService) 214// ============================================ 215 216describe("ReplicationManager gossipsub integration", () => { 217 let tmpDir: string; 218 219 beforeEach(() => { 220 tmpDir = mkdtempSync(join(tmpdir(), "repl-gossipsub-test-")); 221 }); 222 223 afterEach(() => { 224 rmSync(tmpDir, { recursive: true, force: true }); 225 }); 226 227 function createMockNetworkService() { 228 const handlers: Array<(n: CommitNotification) => void | Promise<void>> = []; 229 const subscribedDids: string[] = []; 230 const publishedNotifications: Array<{ did: string; commitCid: string; rev: string }> = []; 231 232 return { 233 provideBlocks: vi.fn().mockResolvedValue(undefined), 234 publishCommitNotification: vi.fn().mockImplementation( 235 async (did: string, commitCid: string, rev: string) => { 236 publishedNotifications.push({ did, commitCid, rev }); 237 }, 238 ), 239 onCommitNotification: vi.fn().mockImplementation( 240 (handler: (n: CommitNotification) => void | Promise<void>) => { 241 handlers.push(handler); 242 }, 243 ), 244 subscribeCommitTopics: vi.fn().mockImplementation( 245 (dids: string[]) => { 246 subscribedDids.push(...dids); 247 }, 248 ), 249 unsubscribeCommitTopics: vi.fn(), 250 getPeerId: vi.fn().mockReturnValue("12D3KooWMockPeer"), 251 getMultiaddrs: vi.fn().mockReturnValue(["/ip4/127.0.0.1/tcp/4001"]), 252 getConnectionCount: vi.fn().mockReturnValue(0), 253 getRemoteAddrs: vi.fn().mockReturnValue([]), 254 publishIdentityNotification: vi.fn().mockResolvedValue(undefined), 255 onIdentityNotification: vi.fn(), 256 subscribeIdentityTopics: vi.fn(), 257 unsubscribeIdentityTopics: vi.fn(), 258 provideForDid: vi.fn().mockResolvedValue(undefined), 259 findProvidersForDid: vi.fn().mockResolvedValue([]), 260 // Test helpers 261 _handlers: handlers, 262 _subscribedDids: subscribedDids, 263 _publishedNotifications: publishedNotifications, 264 _simulateNotification(notification: CommitNotification) { 265 for (const handler of handlers) { 266 handler(notification); 267 } 268 }, 269 }; 270 } 271 272 function createMockBlockStore() { 273 return { 274 putBlock: vi.fn().mockResolvedValue(undefined), 275 getBlock: vi.fn().mockResolvedValue(null), 276 hasBlock: vi.fn().mockResolvedValue(false), 277 putBlocks: vi.fn().mockResolvedValue(undefined), 278 deleteBlock: vi.fn().mockResolvedValue(undefined), 279 }; 280 } 281 282 it("init() subscribes to commit topics for tracked DIDs", async () => { 283 const Database = (await import("better-sqlite3")).default; 284 const { ReplicationManager } = await import("./replication-manager.js"); 285 const { DidResolver } = await import("../did-resolver.js"); 286 287 const db = new Database(join(tmpDir, "test.db")); 288 const config = { 289 DID: "did:plc:local", 290 HANDLE: "test.example.com", 291 PDS_HOSTNAME: "test.example.com", 292 AUTH_TOKEN: "test", 293 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", 294 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 295 JWT_SECRET: "test", 296 PASSWORD_HASH: "$2a$10$test", 297 DATA_DIR: tmpDir, 298 PORT: 3000, 299 IPFS_ENABLED: true, 300 IPFS_NETWORKING: false, 301 REPLICATE_DIDS: ["did:plc:remote1", "did:plc:remote2"], 302 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 303 FIREHOSE_ENABLED: false, 304 RATE_LIMIT_ENABLED: false, 305 RATE_LIMIT_READ_PER_MIN: 300, 306 RATE_LIMIT_SYNC_PER_MIN: 30, 307 RATE_LIMIT_SESSION_PER_MIN: 10, 308 RATE_LIMIT_WRITE_PER_MIN: 200, 309 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 310 RATE_LIMIT_MAX_CONNECTIONS: 100, 311 RATE_LIMIT_FIREHOSE_PER_IP: 3, 312 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 313 }; 314 315 const { RepoManager } = await import("../repo-manager.js"); 316 const repoManager = new RepoManager(db, config); 317 repoManager.init(); 318 319 const mockNet = createMockNetworkService(); 320 const mockBlocks = createMockBlockStore(); 321 const didResolver = new DidResolver(); 322 323 const manager = new ReplicationManager( 324 db, 325 config, 326 repoManager, 327 mockBlocks, 328 mockNet, 329 didResolver, 330 ); 331 332 try { 333 await manager.init(); 334 335 // Verify subscribeCommitTopics was called with tracked DIDs 336 expect(mockNet.subscribeCommitTopics).toHaveBeenCalled(); 337 expect(mockNet._subscribedDids).toContain("did:plc:remote1"); 338 expect(mockNet._subscribedDids).toContain("did:plc:remote2"); 339 340 // Verify onCommitNotification was called to register a handler 341 expect(mockNet.onCommitNotification).toHaveBeenCalled(); 342 expect(mockNet._handlers.length).toBeGreaterThan(0); 343 } finally { 344 manager.stop(); 345 db.close(); 346 } 347 }); 348 349 it("dedup: same rev notification does not trigger re-sync", async () => { 350 const Database = (await import("better-sqlite3")).default; 351 const { ReplicationManager } = await import("./replication-manager.js"); 352 const { DidResolver } = await import("../did-resolver.js"); 353 354 const db = new Database(join(tmpDir, "test-dedup.db")); 355 const config = { 356 DID: "did:plc:local", 357 HANDLE: "test.example.com", 358 PDS_HOSTNAME: "test.example.com", 359 AUTH_TOKEN: "test", 360 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", 361 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 362 JWT_SECRET: "test", 363 PASSWORD_HASH: "$2a$10$test", 364 DATA_DIR: tmpDir, 365 PORT: 3000, 366 IPFS_ENABLED: true, 367 IPFS_NETWORKING: false, 368 REPLICATE_DIDS: ["did:plc:remote1"], 369 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 370 FIREHOSE_ENABLED: false, 371 RATE_LIMIT_ENABLED: false, 372 RATE_LIMIT_READ_PER_MIN: 300, 373 RATE_LIMIT_SYNC_PER_MIN: 30, 374 RATE_LIMIT_SESSION_PER_MIN: 10, 375 RATE_LIMIT_WRITE_PER_MIN: 200, 376 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 377 RATE_LIMIT_MAX_CONNECTIONS: 100, 378 RATE_LIMIT_FIREHOSE_PER_IP: 3, 379 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 380 }; 381 382 const { RepoManager } = await import("../repo-manager.js"); 383 const repoManager = new RepoManager(db, config); 384 repoManager.init(); 385 386 const mockNet = createMockNetworkService(); 387 const mockBlocks = createMockBlockStore(); 388 const didResolver = new DidResolver(); 389 390 const manager = new ReplicationManager( 391 db, 392 config, 393 repoManager, 394 mockBlocks, 395 mockNet, 396 didResolver, 397 ); 398 399 try { 400 await manager.init(); 401 402 // Spy on syncDid to see if it gets called 403 const syncDidSpy = vi.spyOn(manager, "syncDid").mockResolvedValue(undefined); 404 405 const notification: CommitNotification = { 406 did: "did:plc:remote1", 407 commit: "bafyreiabc", 408 rev: "3jui7kd2xxxx2", 409 time: new Date().toISOString(), 410 peer: "12D3KooWOtherPeer", 411 }; 412 413 // First notification should trigger syncDid 414 mockNet._simulateNotification(notification); 415 await new Promise((r) => setTimeout(r, 100)); 416 expect(syncDidSpy).toHaveBeenCalledTimes(1); 417 418 // Same rev notification should be deduped 419 mockNet._simulateNotification(notification); 420 await new Promise((r) => setTimeout(r, 100)); 421 expect(syncDidSpy).toHaveBeenCalledTimes(1); // still 1 422 423 // Different rev should trigger another syncDid 424 mockNet._simulateNotification({ 425 ...notification, 426 rev: "3jui7kd2yyyy3", 427 }); 428 await new Promise((r) => setTimeout(r, 100)); 429 expect(syncDidSpy).toHaveBeenCalledTimes(2); 430 431 syncDidSpy.mockRestore(); 432 } finally { 433 manager.stop(); 434 db.close(); 435 } 436 }); 437 438 it("notification for untracked DID is ignored", async () => { 439 const Database = (await import("better-sqlite3")).default; 440 const { ReplicationManager } = await import("./replication-manager.js"); 441 const { DidResolver } = await import("../did-resolver.js"); 442 443 const db = new Database(join(tmpDir, "test-untracked.db")); 444 const config = { 445 DID: "did:plc:local", 446 HANDLE: "test.example.com", 447 PDS_HOSTNAME: "test.example.com", 448 AUTH_TOKEN: "test", 449 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", 450 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 451 JWT_SECRET: "test", 452 PASSWORD_HASH: "$2a$10$test", 453 DATA_DIR: tmpDir, 454 PORT: 3000, 455 IPFS_ENABLED: true, 456 IPFS_NETWORKING: false, 457 REPLICATE_DIDS: ["did:plc:remote1"], 458 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 459 FIREHOSE_ENABLED: false, 460 RATE_LIMIT_ENABLED: false, 461 RATE_LIMIT_READ_PER_MIN: 300, 462 RATE_LIMIT_SYNC_PER_MIN: 30, 463 RATE_LIMIT_SESSION_PER_MIN: 10, 464 RATE_LIMIT_WRITE_PER_MIN: 200, 465 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 466 RATE_LIMIT_MAX_CONNECTIONS: 100, 467 RATE_LIMIT_FIREHOSE_PER_IP: 3, 468 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 469 }; 470 471 const { RepoManager } = await import("../repo-manager.js"); 472 const repoManager = new RepoManager(db, config); 473 repoManager.init(); 474 475 const mockNet = createMockNetworkService(); 476 const mockBlocks = createMockBlockStore(); 477 const didResolver = new DidResolver(); 478 479 const manager = new ReplicationManager( 480 db, 481 config, 482 repoManager, 483 mockBlocks, 484 mockNet, 485 didResolver, 486 ); 487 488 try { 489 await manager.init(); 490 491 const syncDidSpy = vi.spyOn(manager, "syncDid").mockResolvedValue(undefined); 492 493 // Notification for a DID we're NOT tracking 494 mockNet._simulateNotification({ 495 did: "did:plc:unknown", 496 commit: "bafyreiabc", 497 rev: "3jui7kd2xxxx2", 498 time: new Date().toISOString(), 499 peer: "12D3KooWOther", 500 }); 501 await new Promise((r) => setTimeout(r, 100)); 502 503 expect(syncDidSpy).not.toHaveBeenCalled(); 504 505 syncDidSpy.mockRestore(); 506 } finally { 507 manager.stop(); 508 db.close(); 509 } 510 }); 511}); 512 513// ============================================ 514// Helpers 515// ============================================ 516 517async function waitFor( 518 fn: () => Promise<boolean> | boolean, 519 timeoutMs: number = 10_000, 520 intervalMs: number = 200, 521): Promise<void> { 522 const deadline = Date.now() + timeoutMs; 523 while (Date.now() < deadline) { 524 if (await fn()) return; 525 await new Promise((r) => setTimeout(r, intervalMs)); 526 } 527 throw new Error(`waitFor timed out after ${timeoutMs}ms`); 528}