atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add replication demo script for dashboard validation

Two-node demo: creates records on Node A, replicates to Node B,
leaves Node B running with dashboard visible at localhost:3000.

+223
+223
scripts/demo-replication.ts
··· 1 + /** 2 + * Demo script: starts two nodes, creates records on Node A, 3 + * replicates to Node B, then leaves Node B running so you 4 + * can inspect the admin dashboard with real metrics. 5 + * 6 + * Usage: npx tsx scripts/demo-replication.ts 7 + */ 8 + 9 + import { mkdtempSync, mkdirSync } from "node:fs"; 10 + import { tmpdir } from "node:os"; 11 + import { join } from "node:path"; 12 + import { createServer } from "node:http"; 13 + import Database from "better-sqlite3"; 14 + import { getRequestListener } from "@hono/node-server"; 15 + import pc from "picocolors"; 16 + 17 + import { IpfsService } from "../src/ipfs.js"; 18 + import { RepoManager } from "../src/repo-manager.js"; 19 + import { ReplicationManager } from "../src/replication/replication-manager.js"; 20 + import { ReplicatedRepoReader } from "../src/replication/replicated-repo-reader.js"; 21 + import { Firehose } from "../src/firehose.js"; 22 + import { createApp } from "../src/index.js"; 23 + import type { Config } from "../src/config.js"; 24 + import type { NetworkService } from "../src/ipfs.js"; 25 + 26 + const DEMO_DIR = mkdtempSync(join(tmpdir(), "p2pds-demo-")); 27 + const NODE_A_DID = "did:plc:demosource123"; 28 + const NODE_B_DID = "did:plc:demoreplicator456"; 29 + const PORT_A = 3100; 30 + const PORT_B = 3000; 31 + 32 + function makeConfig( 33 + dataDir: string, 34 + did: string, 35 + replicateDids: string[] = [], 36 + port: number = 0, 37 + ): Config { 38 + return { 39 + DID: did, 40 + HANDLE: "demo.localhost", 41 + PDS_HOSTNAME: "demo.localhost", 42 + AUTH_TOKEN: "demo-token", 43 + SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", 44 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 45 + JWT_SECRET: "demo-jwt-secret", 46 + PASSWORD_HASH: "$2a$10$test", 47 + DATA_DIR: dataDir, 48 + PORT: port, 49 + IPFS_ENABLED: true, 50 + IPFS_NETWORKING: false, 51 + REPLICATE_DIDS: replicateDids, 52 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 53 + FIREHOSE_ENABLED: false, 54 + }; 55 + } 56 + 57 + const mockNetworkService: NetworkService = { 58 + provideBlocks: async () => {}, 59 + publishCommitNotification: async () => {}, 60 + onCommitNotification: () => {}, 61 + subscribeCommitTopics: () => {}, 62 + unsubscribeCommitTopics: () => {}, 63 + getPeerId: () => "12D3KooWDemoPeer", 64 + getMultiaddrs: () => ["/ip4/127.0.0.1/tcp/4001"], 65 + getConnectionCount: () => 1, 66 + getRemoteAddrs: () => [], 67 + publishIdentityNotification: async () => {}, 68 + onIdentityNotification: () => {}, 69 + subscribeIdentityTopics: () => {}, 70 + unsubscribeIdentityTopics: () => {}, 71 + }; 72 + 73 + function startServer(app: ReturnType<typeof createApp>, port: number): Promise<ReturnType<typeof createServer>> { 74 + return new Promise((resolve) => { 75 + const listener = getRequestListener(app.fetch); 76 + const server = createServer(listener); 77 + server.listen(port, "127.0.0.1", () => resolve(server)); 78 + }); 79 + } 80 + 81 + async function main() { 82 + console.log(pc.bold("\n=== P2PDS Replication Demo ===\n")); 83 + console.log(pc.dim(`Temp dir: ${DEMO_DIR}`)); 84 + 85 + // ---- Node A: source PDS ---- 86 + console.log(pc.cyan("\n[Node A] Starting source PDS...")); 87 + const dirA = join(DEMO_DIR, "a"); 88 + mkdirSync(dirA, { recursive: true }); 89 + const dbA = new Database(join(dirA, "pds.db")); 90 + const configA = makeConfig(dirA, NODE_A_DID, [], PORT_A); 91 + const repoManagerA = new RepoManager(dbA, configA); 92 + repoManagerA.init(); 93 + 94 + // Create test records 95 + const recordCount = 15; 96 + console.log(pc.dim(` Creating ${recordCount} test records...`)); 97 + for (let i = 0; i < recordCount; i++) { 98 + await repoManagerA.createRecord("app.bsky.feed.post", undefined, { 99 + $type: "app.bsky.feed.post", 100 + text: `Demo post #${i + 1}: ${["hello world", "testing replication", "p2p is cool", "atproto rocks", "decentralize all the things"][i % 5]}`, 101 + createdAt: new Date(Date.now() - (recordCount - i) * 60000).toISOString(), 102 + }); 103 + } 104 + // Also create some profile-like records 105 + for (let i = 0; i < 3; i++) { 106 + await repoManagerA.createRecord("app.bsky.feed.like", undefined, { 107 + $type: "app.bsky.feed.like", 108 + subject: { uri: `at://${NODE_A_DID}/app.bsky.feed.post/rec${i}`, cid: "bafytest" }, 109 + createdAt: new Date().toISOString(), 110 + }); 111 + } 112 + console.log(pc.dim(` Created ${recordCount + 3} records across 2 collections`)); 113 + 114 + const firehoseA = new Firehose(repoManagerA); 115 + const appA = createApp(configA, repoManagerA, firehoseA); 116 + const serverA = await startServer(appA, PORT_A); 117 + console.log(pc.green(` Node A running at http://127.0.0.1:${PORT_A}`)); 118 + 119 + // ---- Node B: replicator ---- 120 + console.log(pc.cyan("\n[Node B] Starting replicator...")); 121 + const dirB = join(DEMO_DIR, "b"); 122 + mkdirSync(dirB, { recursive: true }); 123 + const dbB = new Database(join(dirB, "pds.db")); 124 + const configB = makeConfig(dirB, NODE_B_DID, [NODE_A_DID], PORT_B); 125 + 126 + const ipfsB = new IpfsService({ 127 + blocksPath: join(dirB, "ipfs-blocks"), 128 + datastorePath: join(dirB, "ipfs-datastore"), 129 + networking: false, 130 + }); 131 + await ipfsB.start(); 132 + 133 + const repoManagerB = new RepoManager(dbB, configB); 134 + repoManagerB.init(undefined, ipfsB, ipfsB); 135 + 136 + const didResolver = { 137 + resolve: async (did: string) => { 138 + if (did === NODE_A_DID) { 139 + return { 140 + id: NODE_A_DID, 141 + service: [{ 142 + id: "#atproto_pds", 143 + type: "AtprotoPersonalDataServer", 144 + serviceEndpoint: `http://127.0.0.1:${PORT_A}`, 145 + }], 146 + }; 147 + } 148 + return null; 149 + }, 150 + }; 151 + 152 + const replicationManager = new ReplicationManager( 153 + dbB, configB, repoManagerB, ipfsB, mockNetworkService, 154 + didResolver as any, 155 + ); 156 + const replicatedRepoReader = new ReplicatedRepoReader( 157 + ipfsB, replicationManager.getSyncStorage(), 158 + ); 159 + replicationManager.setReplicatedRepoReader(replicatedRepoReader); 160 + await replicationManager.init(); 161 + 162 + const firehoseB = new Firehose(repoManagerB); 163 + const appB = createApp( 164 + configB, repoManagerB, firehoseB, 165 + ipfsB, mockNetworkService, undefined, 166 + replicationManager, replicatedRepoReader, 167 + ); 168 + const serverB = await startServer(appB, PORT_B); 169 + console.log(pc.green(` Node B running at http://127.0.0.1:${PORT_B}`)); 170 + 171 + // ---- Trigger replication ---- 172 + console.log(pc.cyan("\n[Sync] Replicating Node A → Node B...")); 173 + await replicationManager.syncAll(); 174 + console.log(pc.green(" Sync complete!")); 175 + 176 + // Print summary 177 + const syncStorage = replicationManager.getSyncStorage(); 178 + const aggregate = syncStorage.getAggregateMetrics(); 179 + const didMetrics = syncStorage.getDidMetrics(NODE_A_DID); 180 + const history = syncStorage.getSyncHistory(undefined, 5); 181 + 182 + console.log(pc.cyan("\n[Metrics Summary]")); 183 + console.log(pc.dim(` Blocks: ${aggregate.totalBlocks}`)); 184 + console.log(pc.dim(` Blobs: ${aggregate.totalBlobs}`)); 185 + console.log(pc.dim(` Records: ${aggregate.totalRecords}`)); 186 + console.log(pc.dim(` Bytes held: ${aggregate.totalBytesHeld}`)); 187 + console.log(pc.dim(` Syncs: ${aggregate.totalSyncs}`)); 188 + console.log(pc.dim(` 24h transfer: ${aggregate.recentTransferredBytes} bytes`)); 189 + 190 + console.log(pc.cyan(`\n[Per-DID: ${NODE_A_DID}]`)); 191 + console.log(pc.dim(` Blocks: ${didMetrics.blocks}`)); 192 + console.log(pc.dim(` Records: ${didMetrics.records}`)); 193 + console.log(pc.dim(` Bytes: ${didMetrics.bytesHeld}`)); 194 + 195 + if (history.length > 0) { 196 + console.log(pc.cyan("\n[Recent Sync History]")); 197 + for (const h of history) { 198 + console.log(pc.dim(` ${h.sourceType} | ${h.status} | +${h.blocksAdded} blocks | ${h.carBytes} bytes | ${h.durationMs}ms`)); 199 + } 200 + } 201 + 202 + console.log(pc.bold(pc.green(`\n✓ Dashboard ready at: http://127.0.0.1:${PORT_B}/xrpc/org.p2pds.admin.dashboard`))); 203 + console.log(pc.dim(" Auth token: demo-token")); 204 + console.log(pc.dim(" Press Ctrl+C to stop\n")); 205 + 206 + // Keep running 207 + process.on("SIGINT", () => { 208 + console.log(pc.dim("\nShutting down...")); 209 + replicationManager.stop(); 210 + serverB.close(); 211 + serverA.close(); 212 + ipfsB.stop().then(() => { 213 + dbB.close(); 214 + dbA.close(); 215 + process.exit(0); 216 + }); 217 + }); 218 + } 219 + 220 + main().catch((err) => { 221 + console.error("Demo failed:", err); 222 + process.exit(1); 223 + });