/** * End-to-end networking integration test. * * Proves two p2pds nodes can discover each other and exchange data * over the network using libp2p/Helia bitswap. * * These tests create real Helia nodes with TCP networking on localhost, * connect them directly, and verify block exchange via bitswap. */ import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import Database from "better-sqlite3"; import { CID } from "multiformats"; // @ts-ignore -- multiformats v9 subpath exports lack type declarations import * as raw from "multiformats/codecs/raw"; // @ts-ignore -- multiformats v9 subpath exports lack type declarations import { sha256 } from "multiformats/hashes/sha2"; import type { Helia } from "@helia/interface"; import { SqliteBlockstore } from "../sqlite-blockstore.js"; import { SqliteDatastore } from "../sqlite-datastore.js"; /** * Create a CID from raw bytes using SHA-256. */ async function cidFromBytes(bytes: Uint8Array): Promise { const hash = await sha256.digest(bytes); return CID.create(1, raw.code, hash); } /** * Collect an async generator of Uint8Array chunks into a single Uint8Array. * Helia/interface-blockstore's get() returns AsyncGenerator, * not a plain Uint8Array. The chunks may be Node.js Buffers, so we * normalize to a plain Uint8Array for consistent comparison. */ async function collectBytes( gen: AsyncIterable, ): Promise { const chunks: Uint8Array[] = []; for await (const chunk of gen) { chunks.push(chunk); } if (chunks.length === 0) return new Uint8Array(0); // Always return a plain Uint8Array (not a Buffer subclass) if (chunks.length === 1) { const c = chunks[0]!; return new Uint8Array(c.buffer, c.byteOffset, c.byteLength); } const total = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(total); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } /** * Create a minimal Helia node with TCP-only networking on localhost. * * Strips out bootstrap peers, mDNS, delegated routing, autoNAT, autoTLS, * uPnP, circuit relay, WebRTC, and WebSockets to avoid any external * network dependencies. Nodes must be connected manually via dial(). */ async function createTestHeliaNode( db: Database.Database, ): Promise { const { createHelia } = await import("helia"); const { noise } = await import("@chainsafe/libp2p-noise"); const { yamux } = await import("@chainsafe/libp2p-yamux"); const { tcp } = await import("@libp2p/tcp"); const { identify } = await import("@libp2p/identify"); const { bitswap } = await import("@helia/block-brokers"); const { libp2pRouting } = await import("@helia/routers"); const { createLibp2p } = await import("libp2p"); const blockstore = new SqliteBlockstore(db); const datastore = new SqliteDatastore(db); const libp2p = await createLibp2p({ addresses: { listen: ["/ip4/127.0.0.1/tcp/0"], }, transports: [tcp()], connectionEncrypters: [noise()], streamMuxers: [yamux()], services: { identify: identify(), }, // No peer discovery -- we connect manually }); const helia = await createHelia({ libp2p, blockstore: blockstore as any, datastore: datastore as any, blockBrokers: [bitswap()], routers: [libp2pRouting(libp2p)], }); return helia; } /** * Wait for a condition to become true, with a timeout. */ async function waitFor( fn: () => Promise | boolean, timeoutMs: number = 10_000, intervalMs: number = 200, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return; await new Promise((r) => setTimeout(r, intervalMs)); } throw new Error(`waitFor timed out after ${timeoutMs}ms`); } describe("E2E networking: two Helia nodes", () => { let tmpDir: string; let nodeA: Helia | null = null; let nodeB: Helia | null = null; let dbA: Database.Database | null = null; let dbB: Database.Database | null = null; beforeEach(() => { tmpDir = mkdtempSync(join(tmpdir(), "e2e-networking-test-")); }); afterEach(async () => { // Stop nodes in parallel for faster cleanup const stops: Promise[] = []; if (nodeA) stops.push(nodeA.stop().catch(() => {})); if (nodeB) stops.push(nodeB.stop().catch(() => {})); await Promise.all(stops); nodeA = null; nodeB = null; if (dbA) { dbA.close(); dbA = null; } if (dbB) { dbB.close(); dbB = null; } rmSync(tmpDir, { recursive: true, force: true }); }); it("nodes can connect and exchange blocks via bitswap", { timeout: 60_000 }, async () => { // 1. Create two Helia nodes with real TCP networking on localhost dbA = new Database(join(tmpDir, "node-a.db")); dbB = new Database(join(tmpDir, "node-b.db")); nodeA = await createTestHeliaNode(dbA); nodeB = await createTestHeliaNode(dbB); // 2. Verify both nodes are running and have addresses const addrsA = nodeA.libp2p.getMultiaddrs(); const addrsB = nodeB.libp2p.getMultiaddrs(); expect(addrsA.length).toBeGreaterThan(0); expect(addrsB.length).toBeGreaterThan(0); const peerIdA = nodeA.libp2p.peerId.toString(); const peerIdB = nodeB.libp2p.peerId.toString(); expect(peerIdA).toBeTruthy(); expect(peerIdB).toBeTruthy(); expect(peerIdA).not.toBe(peerIdB); // 3. Connect node B to node A await nodeB.libp2p.dial(addrsA[0]!); // Wait for the connection to be established in both directions await waitFor( () => nodeA!.libp2p.getConnections().length > 0 && nodeB!.libp2p.getConnections().length > 0, 5_000, ); expect(nodeA.libp2p.getConnections().length).toBeGreaterThan(0); expect(nodeB.libp2p.getConnections().length).toBeGreaterThan(0); // 4. Store blocks on node A const testData = [ new TextEncoder().encode("hello from node A"), new TextEncoder().encode("second block of data"), new TextEncoder().encode("third block for good measure"), ]; const cids: CID[] = []; for (const bytes of testData) { const cid = await cidFromBytes(bytes); await nodeA.blockstore.put(cid, bytes); cids.push(cid); } // Verify blocks are on node A for (const cid of cids) { expect(await nodeA.blockstore.has(cid)).toBe(true); } // Verify blocks are NOT on node B yet for (const cid of cids) { expect( await nodeB.blockstore.has(cid, { offline: true } as any), ).toBe(false); } // 5. Retrieve blocks on node B via bitswap (network fetch) // blockstore.get() returns AsyncGenerator for (let i = 0; i < cids.length; i++) { const cid = cids[i]!; const signal = AbortSignal.timeout(15_000); const retrieved = await collectBytes( nodeB.blockstore.get(cid, { signal }) as any, ); expect(retrieved).toBeDefined(); expect(retrieved.length).toBe(testData[i]!.length); expect(retrieved).toEqual(testData[i]!); } // 6. Verify blocks are now cached on node B for (const cid of cids) { expect(await nodeB.blockstore.has(cid)).toBe(true); } }); it("IpfsService instances can be connected and peer info is correct", { timeout: 60_000 }, async () => { // This test verifies that IpfsService with networking=true // exposes correct peer identity and multiaddr information. const { IpfsService } = await import("../ipfs.js"); const svcDbA = new Database(join(tmpDir, "svc-a.db")); const svcDbB = new Database(join(tmpDir, "svc-b.db")); const serviceA = new IpfsService({ db: svcDbA, networking: true, }); const serviceB = new IpfsService({ db: svcDbB, networking: true, }); try { await serviceA.start(); await serviceB.start(); // Verify peer IDs are present and distinct const peerIdA = serviceA.getPeerId(); const peerIdB = serviceB.getPeerId(); expect(peerIdA).not.toBeNull(); expect(peerIdB).not.toBeNull(); expect(peerIdA).not.toBe(peerIdB); // Verify multiaddrs are available const addrsA = serviceA.getMultiaddrs(); const addrsB = serviceB.getMultiaddrs(); expect(addrsA.length).toBeGreaterThan(0); expect(addrsB.length).toBeGreaterThan(0); // Verify that IpfsService reports running expect(serviceA.isRunning()).toBe(true); expect(serviceB.isRunning()).toBe(true); } finally { if (serviceA.isRunning()) await serviceA.stop(); if (serviceB.isRunning()) await serviceB.stop(); svcDbA.close(); svcDbB.close(); } }); it("block stored on one node is retrievable from the other after connection", { timeout: 60_000 }, async () => { // A focused test: one block, two nodes, verify bitswap fetch. dbA = new Database(join(tmpDir, "single-a.db")); dbB = new Database(join(tmpDir, "single-b.db")); nodeA = await createTestHeliaNode(dbA); nodeB = await createTestHeliaNode(dbB); // Connect const addrsA = nodeA.libp2p.getMultiaddrs(); await nodeB.libp2p.dial(addrsA[0]!); await waitFor( () => nodeB!.libp2p.getConnections().length > 0, 5_000, ); // Store a single block on node A const data = new TextEncoder().encode( "single block e2e test payload", ); const cid = await cidFromBytes(data); await nodeA.blockstore.put(cid, data); // Fetch from node B (will use bitswap to get from node A) const signal = AbortSignal.timeout(15_000); const fetched = await collectBytes( nodeB.blockstore.get(cid, { signal }) as any, ); expect(fetched).toEqual(data); // Verify it is now cached locally on node B const cachedLocally = await collectBytes( nodeB.blockstore.get(cid, { offline: true }) as any, ); expect(cachedLocally).toEqual(data); }); it("nodes discover each other's peer IDs after connection", { timeout: 30_000 }, async () => { dbA = new Database(join(tmpDir, "disc-a.db")); dbB = new Database(join(tmpDir, "disc-b.db")); nodeA = await createTestHeliaNode(dbA); nodeB = await createTestHeliaNode(dbB); const peerIdA = nodeA.libp2p.peerId; const peerIdB = nodeB.libp2p.peerId; // Before connection, neither knows the other expect(nodeA.libp2p.getConnections(peerIdB)).toHaveLength(0); expect(nodeB.libp2p.getConnections(peerIdA)).toHaveLength(0); // Connect B -> A const addrsA = nodeA.libp2p.getMultiaddrs(); await nodeB.libp2p.dial(addrsA[0]!); // After connection, both should see the connection await waitFor( () => nodeA!.libp2p.getConnections(peerIdB).length > 0, 5_000, ); expect( nodeA.libp2p.getConnections(peerIdB).length, ).toBeGreaterThan(0); expect( nodeB.libp2p.getConnections(peerIdA).length, ).toBeGreaterThan(0); }); it("bidirectional block exchange works", { timeout: 60_000 }, async () => { dbA = new Database(join(tmpDir, "bidir-a.db")); dbB = new Database(join(tmpDir, "bidir-b.db")); nodeA = await createTestHeliaNode(dbA); nodeB = await createTestHeliaNode(dbB); // Connect await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); await waitFor( () => nodeA!.libp2p.getConnections().length > 0 && nodeB!.libp2p.getConnections().length > 0, 5_000, ); // Store block on A, different block on B const dataA = new TextEncoder().encode("block from A"); const dataB = new TextEncoder().encode("block from B"); const cidA = await cidFromBytes(dataA); const cidB = await cidFromBytes(dataB); await nodeA.blockstore.put(cidA, dataA); await nodeB.blockstore.put(cidB, dataB); const signal = AbortSignal.timeout(15_000); // B fetches A's block const fetchedFromA = await collectBytes( nodeB.blockstore.get(cidA, { signal }) as any, ); expect(fetchedFromA).toEqual(dataA); // A fetches B's block const fetchedFromB = await collectBytes( nodeA.blockstore.get(cidB, { signal }) as any, ); expect(fetchedFromB).toEqual(dataB); }); });