/** * Demo script: starts two nodes, creates records on Node A, * replicates to Node B, then leaves Node B running so you * can inspect the app with real metrics. * * Usage: npx tsx scripts/demo-replication.ts */ import { mkdtempSync, mkdirSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { createServer } from "node:http"; import Database from "better-sqlite3"; import { getRequestListener } from "@hono/node-server"; import pc from "picocolors"; import { IpfsService } from "../src/ipfs.js"; import { RepoManager } from "../src/repo-manager.js"; import { ReplicationManager } from "../src/replication/replication-manager.js"; import { ReplicatedRepoReader } from "../src/replication/replicated-repo-reader.js"; import { Firehose } from "../src/firehose.js"; import { createApp } from "../src/index.js"; import type { Config } from "../src/config.js"; import type { NetworkService } from "../src/ipfs.js"; const DEMO_DIR = mkdtempSync(join(tmpdir(), "p2pds-demo-")); const NODE_A_DID = "did:plc:demosource123"; const NODE_B_DID = "did:plc:demoreplicator456"; const PORT_A = 3100; const PORT_B = 3000; function makeConfig( dataDir: string, did: string, replicateDids: string[] = [], port: number = 0, ): Config { return { DID: did, HANDLE: "demo.localhost", PDS_HOSTNAME: "demo.localhost", AUTH_TOKEN: "demo-token", SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", JWT_SECRET: "demo-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: port, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: replicateDids, FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, }; } const mockNetworkService: NetworkService = { provideBlocks: async () => {}, publishCommitNotification: async () => {}, onCommitNotification: () => {}, subscribeCommitTopics: () => {}, unsubscribeCommitTopics: () => {}, getPeerId: () => "12D3KooWDemoPeer", getMultiaddrs: () => ["/ip4/127.0.0.1/tcp/4001"], getConnectionCount: () => 1, getRemoteAddrs: () => [], publishIdentityNotification: async () => {}, onIdentityNotification: () => {}, subscribeIdentityTopics: () => {}, unsubscribeIdentityTopics: () => {}, }; function startServer(app: ReturnType, port: number): Promise> { return new Promise((resolve) => { const listener = getRequestListener(app.fetch); const server = createServer(listener); server.listen(port, "127.0.0.1", () => resolve(server)); }); } async function main() { console.log(pc.bold("\n=== P2PDS Replication Demo ===\n")); console.log(pc.dim(`Temp dir: ${DEMO_DIR}`)); // ---- Node A: source PDS ---- console.log(pc.cyan("\n[Node A] Starting source PDS...")); const dirA = join(DEMO_DIR, "a"); mkdirSync(dirA, { recursive: true }); const dbA = new Database(join(dirA, "pds.db")); const configA = makeConfig(dirA, NODE_A_DID, [], PORT_A); const repoManagerA = new RepoManager(dbA, configA); repoManagerA.init(); // Create test records const recordCount = 15; console.log(pc.dim(` Creating ${recordCount} test records...`)); for (let i = 0; i < recordCount; i++) { await repoManagerA.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: `Demo post #${i + 1}: ${["hello world", "testing replication", "p2p is cool", "atproto rocks", "decentralize all the things"][i % 5]}`, createdAt: new Date(Date.now() - (recordCount - i) * 60000).toISOString(), }); } // Also create some profile-like records for (let i = 0; i < 3; i++) { await repoManagerA.createRecord("app.bsky.feed.like", undefined, { $type: "app.bsky.feed.like", subject: { uri: `at://${NODE_A_DID}/app.bsky.feed.post/rec${i}`, cid: "bafytest" }, createdAt: new Date().toISOString(), }); } console.log(pc.dim(` Created ${recordCount + 3} records across 2 collections`)); const firehoseA = new Firehose(repoManagerA); const appA = createApp(configA, repoManagerA, firehoseA); const serverA = await startServer(appA, PORT_A); console.log(pc.green(` Node A running at http://127.0.0.1:${PORT_A}`)); // ---- Node B: replicator ---- console.log(pc.cyan("\n[Node B] Starting replicator...")); const dirB = join(DEMO_DIR, "b"); mkdirSync(dirB, { recursive: true }); const dbB = new Database(join(dirB, "pds.db")); const configB = makeConfig(dirB, NODE_B_DID, [NODE_A_DID], PORT_B); const ipfsB = new IpfsService({ blocksPath: join(dirB, "ipfs-blocks"), datastorePath: join(dirB, "ipfs-datastore"), networking: false, }); await ipfsB.start(); const repoManagerB = new RepoManager(dbB, configB); repoManagerB.init(undefined, ipfsB, ipfsB); const didResolver = { resolve: async (did: string) => { if (did === NODE_A_DID) { return { id: NODE_A_DID, service: [{ id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: `http://127.0.0.1:${PORT_A}`, }], }; } return null; }, }; const replicationManager = new ReplicationManager( dbB, configB, repoManagerB, ipfsB, mockNetworkService, didResolver as any, ); const replicatedRepoReader = new ReplicatedRepoReader( ipfsB, replicationManager.getSyncStorage(), ); replicationManager.setReplicatedRepoReader(replicatedRepoReader); await replicationManager.init(); const firehoseB = new Firehose(repoManagerB); const appB = createApp( configB, repoManagerB, firehoseB, ipfsB, mockNetworkService, undefined, replicationManager, replicatedRepoReader, ); const serverB = await startServer(appB, PORT_B); console.log(pc.green(` Node B running at http://127.0.0.1:${PORT_B}`)); // ---- Trigger replication ---- console.log(pc.cyan("\n[Sync] Replicating Node A → Node B...")); await replicationManager.syncAll(); console.log(pc.green(" Sync complete!")); // Print summary const syncStorage = replicationManager.getSyncStorage(); const aggregate = syncStorage.getAggregateMetrics(); const didMetrics = syncStorage.getDidMetrics(NODE_A_DID); const history = syncStorage.getSyncHistory(undefined, 5); console.log(pc.cyan("\n[Metrics Summary]")); console.log(pc.dim(` Blocks: ${aggregate.totalBlocks}`)); console.log(pc.dim(` Blobs: ${aggregate.totalBlobs}`)); console.log(pc.dim(` Records: ${aggregate.totalRecords}`)); console.log(pc.dim(` Bytes held: ${aggregate.totalBytesHeld}`)); console.log(pc.dim(` Syncs: ${aggregate.totalSyncs}`)); console.log(pc.dim(` 24h transfer: ${aggregate.recentTransferredBytes} bytes`)); console.log(pc.cyan(`\n[Per-DID: ${NODE_A_DID}]`)); console.log(pc.dim(` Blocks: ${didMetrics.blocks}`)); console.log(pc.dim(` Records: ${didMetrics.records}`)); console.log(pc.dim(` Bytes: ${didMetrics.bytesHeld}`)); if (history.length > 0) { console.log(pc.cyan("\n[Recent Sync History]")); for (const h of history) { console.log(pc.dim(` ${h.sourceType} | ${h.status} | +${h.blocksAdded} blocks | ${h.carBytes} bytes | ${h.durationMs}ms`)); } } console.log(pc.bold(pc.green(`\nāœ“ App ready at: http://127.0.0.1:${PORT_B}/`))); console.log(pc.dim(" Auth token: demo-token")); console.log(pc.dim(" Press Ctrl+C to stop\n")); // Keep running process.on("SIGINT", () => { console.log(pc.dim("\nShutting down...")); replicationManager.stop(); serverB.close(); serverA.close(); ipfsB.stop().then(() => { dbB.close(); dbA.close(); process.exit(0); }); }); } main().catch((err) => { console.error("Demo failed:", err); process.exit(1); });