atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 223 lines 7.7 kB view raw
1/** 2 * Demo script: starts two nodes, creates records on Node A, 3 * replicates to Node B, then leaves Node B running so you 4 * can inspect the app with real metrics. 5 * 6 * Usage: npx tsx scripts/demo-replication.ts 7 */ 8 9import { mkdtempSync, mkdirSync } from "node:fs"; 10import { tmpdir } from "node:os"; 11import { join } from "node:path"; 12import { createServer } from "node:http"; 13import Database from "better-sqlite3"; 14import { getRequestListener } from "@hono/node-server"; 15import pc from "picocolors"; 16 17import { IpfsService } from "../src/ipfs.js"; 18import { RepoManager } from "../src/repo-manager.js"; 19import { ReplicationManager } from "../src/replication/replication-manager.js"; 20import { ReplicatedRepoReader } from "../src/replication/replicated-repo-reader.js"; 21import { Firehose } from "../src/firehose.js"; 22import { createApp } from "../src/index.js"; 23import type { Config } from "../src/config.js"; 24import type { NetworkService } from "../src/ipfs.js"; 25 26const DEMO_DIR = mkdtempSync(join(tmpdir(), "p2pds-demo-")); 27const NODE_A_DID = "did:plc:demosource123"; 28const NODE_B_DID = "did:plc:demoreplicator456"; 29const PORT_A = 3100; 30const PORT_B = 3000; 31 32function makeConfig( 33 dataDir: string, 34 did: string, 35 replicateDids: string[] = [], 36 port: number = 0, 37): Config { 38 return { 39 DID: did, 40 HANDLE: "demo.localhost", 41 PDS_HOSTNAME: "demo.localhost", 42 AUTH_TOKEN: "demo-token", 43 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", 44 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 45 JWT_SECRET: "demo-jwt-secret", 46 PASSWORD_HASH: "$2a$10$test", 47 DATA_DIR: dataDir, 48 PORT: port, 49 IPFS_ENABLED: true, 50 IPFS_NETWORKING: false, 51 REPLICATE_DIDS: replicateDids, 52 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 53 FIREHOSE_ENABLED: false, 54 }; 55} 56 57const mockNetworkService: NetworkService = { 58 provideBlocks: async () => {}, 59 publishCommitNotification: async () => {}, 60 onCommitNotification: () => {}, 61 subscribeCommitTopics: () => {}, 62 unsubscribeCommitTopics: () => {}, 63 getPeerId: () => "12D3KooWDemoPeer", 64 getMultiaddrs: () => ["/ip4/127.0.0.1/tcp/4001"], 65 getConnectionCount: () => 1, 66 getRemoteAddrs: () => [], 67 publishIdentityNotification: async () => {}, 68 onIdentityNotification: () => {}, 69 subscribeIdentityTopics: () => {}, 70 unsubscribeIdentityTopics: () => {}, 71}; 72 73function startServer(app: ReturnType<typeof createApp>, port: number): Promise<ReturnType<typeof createServer>> { 74 return new Promise((resolve) => { 75 const listener = getRequestListener(app.fetch); 76 const server = createServer(listener); 77 server.listen(port, "127.0.0.1", () => resolve(server)); 78 }); 79} 80 81async function main() { 82 console.log(pc.bold("\n=== P2PDS Replication Demo ===\n")); 83 console.log(pc.dim(`Temp dir: ${DEMO_DIR}`)); 84 85 // ---- Node A: source PDS ---- 86 console.log(pc.cyan("\n[Node A] Starting source PDS...")); 87 const dirA = join(DEMO_DIR, "a"); 88 mkdirSync(dirA, { recursive: true }); 89 const dbA = new Database(join(dirA, "pds.db")); 90 const configA = makeConfig(dirA, NODE_A_DID, [], PORT_A); 91 const repoManagerA = new RepoManager(dbA, configA); 92 repoManagerA.init(); 93 94 // Create test records 95 const recordCount = 15; 96 console.log(pc.dim(` Creating ${recordCount} test records...`)); 97 for (let i = 0; i < recordCount; i++) { 98 await repoManagerA.createRecord("app.bsky.feed.post", undefined, { 99 $type: "app.bsky.feed.post", 100 text: `Demo post #${i + 1}: ${["hello world", "testing replication", "p2p is cool", "atproto rocks", "decentralize all the things"][i % 5]}`, 101 createdAt: new Date(Date.now() - (recordCount - i) * 60000).toISOString(), 102 }); 103 } 104 // Also create some profile-like records 105 for (let i = 0; i < 3; i++) { 106 await repoManagerA.createRecord("app.bsky.feed.like", undefined, { 107 $type: "app.bsky.feed.like", 108 subject: { uri: `at://${NODE_A_DID}/app.bsky.feed.post/rec${i}`, cid: "bafytest" }, 109 createdAt: new Date().toISOString(), 110 }); 111 } 112 console.log(pc.dim(` Created ${recordCount + 3} records across 2 collections`)); 113 114 const firehoseA = new Firehose(repoManagerA); 115 const appA = createApp(configA, repoManagerA, firehoseA); 116 const serverA = await startServer(appA, PORT_A); 117 console.log(pc.green(` Node A running at http://127.0.0.1:${PORT_A}`)); 118 119 // ---- Node B: replicator ---- 120 console.log(pc.cyan("\n[Node B] Starting replicator...")); 121 const dirB = join(DEMO_DIR, "b"); 122 mkdirSync(dirB, { recursive: true }); 123 const dbB = new Database(join(dirB, "pds.db")); 124 const configB = makeConfig(dirB, NODE_B_DID, [NODE_A_DID], PORT_B); 125 126 const ipfsB = new IpfsService({ 127 blocksPath: join(dirB, "ipfs-blocks"), 128 datastorePath: join(dirB, "ipfs-datastore"), 129 networking: false, 130 }); 131 await ipfsB.start(); 132 133 const repoManagerB = new RepoManager(dbB, configB); 134 repoManagerB.init(undefined, ipfsB, ipfsB); 135 136 const didResolver = { 137 resolve: async (did: string) => { 138 if (did === NODE_A_DID) { 139 return { 140 id: NODE_A_DID, 141 service: [{ 142 id: "#atproto_pds", 143 type: "AtprotoPersonalDataServer", 144 serviceEndpoint: `http://127.0.0.1:${PORT_A}`, 145 }], 146 }; 147 } 148 return null; 149 }, 150 }; 151 152 const replicationManager = new ReplicationManager( 153 dbB, configB, repoManagerB, ipfsB, mockNetworkService, 154 didResolver as any, 155 ); 156 const replicatedRepoReader = new ReplicatedRepoReader( 157 ipfsB, replicationManager.getSyncStorage(), 158 ); 159 replicationManager.setReplicatedRepoReader(replicatedRepoReader); 160 await replicationManager.init(); 161 162 const firehoseB = new Firehose(repoManagerB); 163 const appB = createApp( 164 configB, repoManagerB, firehoseB, 165 ipfsB, mockNetworkService, undefined, 166 replicationManager, replicatedRepoReader, 167 ); 168 const serverB = await startServer(appB, PORT_B); 169 console.log(pc.green(` Node B running at http://127.0.0.1:${PORT_B}`)); 170 171 // ---- Trigger replication ---- 172 console.log(pc.cyan("\n[Sync] Replicating Node A → Node B...")); 173 await replicationManager.syncAll(); 174 console.log(pc.green(" Sync complete!")); 175 176 // Print summary 177 const syncStorage = replicationManager.getSyncStorage(); 178 const aggregate = syncStorage.getAggregateMetrics(); 179 const didMetrics = syncStorage.getDidMetrics(NODE_A_DID); 180 const history = syncStorage.getSyncHistory(undefined, 5); 181 182 console.log(pc.cyan("\n[Metrics Summary]")); 183 console.log(pc.dim(` Blocks: ${aggregate.totalBlocks}`)); 184 console.log(pc.dim(` Blobs: ${aggregate.totalBlobs}`)); 185 console.log(pc.dim(` Records: ${aggregate.totalRecords}`)); 186 console.log(pc.dim(` Bytes held: ${aggregate.totalBytesHeld}`)); 187 console.log(pc.dim(` Syncs: ${aggregate.totalSyncs}`)); 188 console.log(pc.dim(` 24h transfer: ${aggregate.recentTransferredBytes} bytes`)); 189 190 console.log(pc.cyan(`\n[Per-DID: ${NODE_A_DID}]`)); 191 console.log(pc.dim(` Blocks: ${didMetrics.blocks}`)); 192 console.log(pc.dim(` Records: ${didMetrics.records}`)); 193 console.log(pc.dim(` Bytes: ${didMetrics.bytesHeld}`)); 194 195 if (history.length > 0) { 196 console.log(pc.cyan("\n[Recent Sync History]")); 197 for (const h of history) { 198 console.log(pc.dim(` ${h.sourceType} | ${h.status} | +${h.blocksAdded} blocks | ${h.carBytes} bytes | ${h.durationMs}ms`)); 199 } 200 } 201 202 console.log(pc.bold(pc.green(`\n✓ App ready at: http://127.0.0.1:${PORT_B}/`))); 203 console.log(pc.dim(" Auth token: demo-token")); 204 console.log(pc.dim(" Press Ctrl+C to stop\n")); 205 206 // Keep running 207 process.on("SIGINT", () => { 208 console.log(pc.dim("\nShutting down...")); 209 replicationManager.stop(); 210 serverB.close(); 211 serverA.close(); 212 ipfsB.stop().then(() => { 213 dbB.close(); 214 dbA.close(); 215 process.exit(0); 216 }); 217 }); 218} 219 220main().catch((err) => { 221 console.error("Demo failed:", err); 222 process.exit(1); 223});