atproto user agency toolkit for individuals and groups
1/**
2 * Demo script: starts two nodes, creates records on Node A,
3 * replicates to Node B, then leaves Node B running so you
4 * can inspect the app with real metrics.
5 *
6 * Usage: npx tsx scripts/demo-replication.ts
7 */
8
9import { mkdtempSync, mkdirSync } from "node:fs";
10import { tmpdir } from "node:os";
11import { join } from "node:path";
12import { createServer } from "node:http";
13import Database from "better-sqlite3";
14import { getRequestListener } from "@hono/node-server";
15import pc from "picocolors";
16
17import { IpfsService } from "../src/ipfs.js";
18import { RepoManager } from "../src/repo-manager.js";
19import { ReplicationManager } from "../src/replication/replication-manager.js";
20import { ReplicatedRepoReader } from "../src/replication/replicated-repo-reader.js";
21import { Firehose } from "../src/firehose.js";
22import { createApp } from "../src/index.js";
23import type { Config } from "../src/config.js";
24import type { NetworkService } from "../src/ipfs.js";
25
26const DEMO_DIR = mkdtempSync(join(tmpdir(), "p2pds-demo-"));
27const NODE_A_DID = "did:plc:demosource123";
28const NODE_B_DID = "did:plc:demoreplicator456";
29const PORT_A = 3100;
30const PORT_B = 3000;
31
32function makeConfig(
33 dataDir: string,
34 did: string,
35 replicateDids: string[] = [],
36 port: number = 0,
37): Config {
38 return {
39 DID: did,
40 HANDLE: "demo.localhost",
41 PDS_HOSTNAME: "demo.localhost",
42 AUTH_TOKEN: "demo-token",
43 SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001",
44 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
45 JWT_SECRET: "demo-jwt-secret",
46 PASSWORD_HASH: "$2a$10$test",
47 DATA_DIR: dataDir,
48 PORT: port,
49 IPFS_ENABLED: true,
50 IPFS_NETWORKING: false,
51 REPLICATE_DIDS: replicateDids,
52 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
53 FIREHOSE_ENABLED: false,
54 };
55}
56
57const mockNetworkService: NetworkService = {
58 provideBlocks: async () => {},
59 publishCommitNotification: async () => {},
60 onCommitNotification: () => {},
61 subscribeCommitTopics: () => {},
62 unsubscribeCommitTopics: () => {},
63 getPeerId: () => "12D3KooWDemoPeer",
64 getMultiaddrs: () => ["/ip4/127.0.0.1/tcp/4001"],
65 getConnectionCount: () => 1,
66 getRemoteAddrs: () => [],
67 publishIdentityNotification: async () => {},
68 onIdentityNotification: () => {},
69 subscribeIdentityTopics: () => {},
70 unsubscribeIdentityTopics: () => {},
71};
72
73function startServer(app: ReturnType<typeof createApp>, port: number): Promise<ReturnType<typeof createServer>> {
74 return new Promise((resolve) => {
75 const listener = getRequestListener(app.fetch);
76 const server = createServer(listener);
77 server.listen(port, "127.0.0.1", () => resolve(server));
78 });
79}
80
81async function main() {
82 console.log(pc.bold("\n=== P2PDS Replication Demo ===\n"));
83 console.log(pc.dim(`Temp dir: ${DEMO_DIR}`));
84
85 // ---- Node A: source PDS ----
86 console.log(pc.cyan("\n[Node A] Starting source PDS..."));
87 const dirA = join(DEMO_DIR, "a");
88 mkdirSync(dirA, { recursive: true });
89 const dbA = new Database(join(dirA, "pds.db"));
90 const configA = makeConfig(dirA, NODE_A_DID, [], PORT_A);
91 const repoManagerA = new RepoManager(dbA, configA);
92 repoManagerA.init();
93
94 // Create test records
95 const recordCount = 15;
96 console.log(pc.dim(` Creating ${recordCount} test records...`));
97 for (let i = 0; i < recordCount; i++) {
98 await repoManagerA.createRecord("app.bsky.feed.post", undefined, {
99 $type: "app.bsky.feed.post",
100 text: `Demo post #${i + 1}: ${["hello world", "testing replication", "p2p is cool", "atproto rocks", "decentralize all the things"][i % 5]}`,
101 createdAt: new Date(Date.now() - (recordCount - i) * 60000).toISOString(),
102 });
103 }
104 // Also create some profile-like records
105 for (let i = 0; i < 3; i++) {
106 await repoManagerA.createRecord("app.bsky.feed.like", undefined, {
107 $type: "app.bsky.feed.like",
108 subject: { uri: `at://${NODE_A_DID}/app.bsky.feed.post/rec${i}`, cid: "bafytest" },
109 createdAt: new Date().toISOString(),
110 });
111 }
112 console.log(pc.dim(` Created ${recordCount + 3} records across 2 collections`));
113
114 const firehoseA = new Firehose(repoManagerA);
115 const appA = createApp(configA, repoManagerA, firehoseA);
116 const serverA = await startServer(appA, PORT_A);
117 console.log(pc.green(` Node A running at http://127.0.0.1:${PORT_A}`));
118
119 // ---- Node B: replicator ----
120 console.log(pc.cyan("\n[Node B] Starting replicator..."));
121 const dirB = join(DEMO_DIR, "b");
122 mkdirSync(dirB, { recursive: true });
123 const dbB = new Database(join(dirB, "pds.db"));
124 const configB = makeConfig(dirB, NODE_B_DID, [NODE_A_DID], PORT_B);
125
126 const ipfsB = new IpfsService({
127 blocksPath: join(dirB, "ipfs-blocks"),
128 datastorePath: join(dirB, "ipfs-datastore"),
129 networking: false,
130 });
131 await ipfsB.start();
132
133 const repoManagerB = new RepoManager(dbB, configB);
134 repoManagerB.init(undefined, ipfsB, ipfsB);
135
136 const didResolver = {
137 resolve: async (did: string) => {
138 if (did === NODE_A_DID) {
139 return {
140 id: NODE_A_DID,
141 service: [{
142 id: "#atproto_pds",
143 type: "AtprotoPersonalDataServer",
144 serviceEndpoint: `http://127.0.0.1:${PORT_A}`,
145 }],
146 };
147 }
148 return null;
149 },
150 };
151
152 const replicationManager = new ReplicationManager(
153 dbB, configB, repoManagerB, ipfsB, mockNetworkService,
154 didResolver as any,
155 );
156 const replicatedRepoReader = new ReplicatedRepoReader(
157 ipfsB, replicationManager.getSyncStorage(),
158 );
159 replicationManager.setReplicatedRepoReader(replicatedRepoReader);
160 await replicationManager.init();
161
162 const firehoseB = new Firehose(repoManagerB);
163 const appB = createApp(
164 configB, repoManagerB, firehoseB,
165 ipfsB, mockNetworkService, undefined,
166 replicationManager, replicatedRepoReader,
167 );
168 const serverB = await startServer(appB, PORT_B);
169 console.log(pc.green(` Node B running at http://127.0.0.1:${PORT_B}`));
170
171 // ---- Trigger replication ----
172 console.log(pc.cyan("\n[Sync] Replicating Node A → Node B..."));
173 await replicationManager.syncAll();
174 console.log(pc.green(" Sync complete!"));
175
176 // Print summary
177 const syncStorage = replicationManager.getSyncStorage();
178 const aggregate = syncStorage.getAggregateMetrics();
179 const didMetrics = syncStorage.getDidMetrics(NODE_A_DID);
180 const history = syncStorage.getSyncHistory(undefined, 5);
181
182 console.log(pc.cyan("\n[Metrics Summary]"));
183 console.log(pc.dim(` Blocks: ${aggregate.totalBlocks}`));
184 console.log(pc.dim(` Blobs: ${aggregate.totalBlobs}`));
185 console.log(pc.dim(` Records: ${aggregate.totalRecords}`));
186 console.log(pc.dim(` Bytes held: ${aggregate.totalBytesHeld}`));
187 console.log(pc.dim(` Syncs: ${aggregate.totalSyncs}`));
188 console.log(pc.dim(` 24h transfer: ${aggregate.recentTransferredBytes} bytes`));
189
190 console.log(pc.cyan(`\n[Per-DID: ${NODE_A_DID}]`));
191 console.log(pc.dim(` Blocks: ${didMetrics.blocks}`));
192 console.log(pc.dim(` Records: ${didMetrics.records}`));
193 console.log(pc.dim(` Bytes: ${didMetrics.bytesHeld}`));
194
195 if (history.length > 0) {
196 console.log(pc.cyan("\n[Recent Sync History]"));
197 for (const h of history) {
198 console.log(pc.dim(` ${h.sourceType} | ${h.status} | +${h.blocksAdded} blocks | ${h.carBytes} bytes | ${h.durationMs}ms`));
199 }
200 }
201
202 console.log(pc.bold(pc.green(`\n✓ App ready at: http://127.0.0.1:${PORT_B}/`)));
203 console.log(pc.dim(" Auth token: demo-token"));
204 console.log(pc.dim(" Press Ctrl+C to stop\n"));
205
206 // Keep running
207 process.on("SIGINT", () => {
208 console.log(pc.dim("\nShutting down..."));
209 replicationManager.stop();
210 serverB.close();
211 serverA.close();
212 ipfsB.stop().then(() => {
213 dbB.close();
214 dbA.close();
215 process.exit(0);
216 });
217 });
218}
219
220main().catch((err) => {
221 console.error("Demo failed:", err);
222 process.exit(1);
223});