atproto user agency toolkit for individuals and groups
1/**
2 * Real OAuth Bidirectional Replication Smoke Test
3 *
4 * Starts two p2pds servers with IPFS networking enabled, guides user
5 * through OAuth in the browser, then automates: self-sync, peer dial,
6 * cross-sync via libp2p, verify cross-serving, cleanup.
7 *
8 * No polling or timeouts — uses property interception for OAuth identity
9 * events, promise capture for syncDid calls, and deterministic dial().
10 *
11 * Usage: npx tsx scripts/real-bidir-test.ts alice.bsky.social bob.bsky.social [--clean]
12 *
13 * Sessions persist in stable data dirs — re-runs skip OAuth and self-sync.
14 * Use --clean to wipe data dirs and disconnect after the test.
15 */
16
17import { mkdirSync, rmSync, existsSync } from "node:fs";
18import { tmpdir } from "node:os";
19import { join } from "node:path";
20import { execSync } from "node:child_process";
21import { startServer, type ServerHandle } from "../src/start.js";
22import type { Config } from "../src/config.js";
23import type { ReplicationManager } from "../src/replication/replication-manager.js";
24import type { IpfsService } from "../src/ipfs.js";
25
26// ---------------------------------------------------------------------------
27// Shared state for signal-safe cleanup
28// ---------------------------------------------------------------------------
29
30let serverA: ServerHandle | undefined;
31let serverB: ServerHandle | undefined;
32let tmpA: string | undefined;
33let tmpB: string | undefined;
34let cleanDirs = false;
35
36async function cleanup() {
37 log("Shutting down servers...");
38 if (serverA) { await serverA.close().catch(() => {}); serverA = undefined; }
39 if (serverB) { await serverB.close().catch(() => {}); serverB = undefined; }
40 if (cleanDirs) {
41 if (tmpA) { log("Cleaning up data dirs..."); rmSync(tmpA, { recursive: true, force: true }); tmpA = undefined; }
42 if (tmpB) { rmSync(tmpB, { recursive: true, force: true }); tmpB = undefined; }
43 }
44
45 // Brief settle for async gossipsub teardown
46 await new Promise((r) => setTimeout(r, 500));
47 log("Done.");
48}
49
50// Suppress gossipsub StreamStateError (async background streams during peer connect/disconnect)
51process.on("uncaughtException", (err) => {
52 if (err?.constructor?.name === "StreamStateError") return;
53 console.error("Uncaught:", err);
54 cleanup().finally(() => process.exit(1));
55});
56
57// Handle SIGINT/SIGTERM for clean shutdown when killed
58for (const sig of ["SIGINT", "SIGTERM"] as const) {
59 process.on(sig, () => {
60 log(`Caught ${sig}, cleaning up...`);
61 cleanup().finally(() => process.exit(1));
62 });
63}
64
65// ---------------------------------------------------------------------------
66// Helpers
67// ---------------------------------------------------------------------------
68
69function log(msg: string) {
70 console.log(`\x1b[36m[test]\x1b[0m ${msg}`);
71}
72
73function fail(msg: string): never {
74 console.error(`\x1b[31m[FAIL]\x1b[0m ${msg}`);
75 cleanup().finally(() => process.exit(1));
76 throw new Error("unreachable");
77}
78
79function ok(msg: string) {
80 console.log(`\x1b[32m[ OK ]\x1b[0m ${msg}`);
81}
82
83/**
84 * Intercept config.DID setter so we get an event-driven promise
85 * that resolves when the OAuth callback establishes identity.
86 * No polling — the setter fires synchronously inside the callback.
87 */
88function withIdentityPromise(config: Config): Promise<string> {
89 let _did = config.DID;
90 let _resolve: ((did: string) => void) | undefined;
91
92 const promise = new Promise<string>((resolve) => {
93 if (_did) { resolve(_did); return; }
94 _resolve = resolve;
95 });
96
97 Object.defineProperty(config, "DID", {
98 get: () => _did,
99 set: (v: string | undefined) => {
100 _did = v;
101 if (v && _resolve) {
102 _resolve(v);
103 _resolve = undefined;
104 }
105 },
106 enumerable: true,
107 configurable: true,
108 });
109
110 return promise;
111}
112
113/**
114 * Intercept syncDid on a ReplicationManager to capture per-DID promises.
115 *
116 * Returns an `awaitSync(did)` function that:
117 * - Returns the stored promise if syncDid was already called for that DID
118 * - Otherwise waits (event-driven, no polling) until syncDid is called
119 */
120function interceptSyncDid(rm: ReplicationManager) {
121 const captured = new Map<string, Promise<void>>();
122 const waiters = new Map<string, (syncPromise: Promise<void>) => void>();
123
124 const origSyncDid = rm.syncDid.bind(rm);
125 (rm as any).syncDid = (did: string) => {
126 const p = origSyncDid(did);
127 captured.set(did, p);
128 const waiter = waiters.get(did);
129 if (waiter) {
130 waiter(p);
131 waiters.delete(did);
132 }
133 return p;
134 };
135
136 return {
137 awaitSync(did: string): Promise<void> {
138 if (captured.has(did)) return captured.get(did)!;
139 return new Promise<void>((resolve, reject) => {
140 waiters.set(did, (p) => p.then(resolve, reject));
141 });
142 },
143 };
144}
145
146function makeConfig(dataDir: string, port: number): Config {
147 return {
148 PDS_HOSTNAME: `localhost:${port}`,
149 AUTH_TOKEN: "smoke-test-token",
150 JWT_SECRET: "smoke-jwt-secret",
151 PASSWORD_HASH: "$2a$10$test",
152 DATA_DIR: dataDir,
153 PORT: port,
154 IPFS_ENABLED: true,
155 IPFS_NETWORKING: true,
156 REPLICATE_DIDS: [],
157 FIREHOSE_URL: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
158 FIREHOSE_ENABLED: false,
159 RATE_LIMIT_ENABLED: false,
160 RATE_LIMIT_READ_PER_MIN: 300,
161 RATE_LIMIT_SYNC_PER_MIN: 30,
162 RATE_LIMIT_SESSION_PER_MIN: 10,
163 RATE_LIMIT_WRITE_PER_MIN: 200,
164 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
165 RATE_LIMIT_MAX_CONNECTIONS: 100,
166 RATE_LIMIT_FIREHOSE_PER_IP: 3,
167 OAUTH_ENABLED: true,
168 };
169}
170
171async function fetchJson<T>(url: string, opts?: RequestInit): Promise<T> {
172 const res = await fetch(url, opts);
173 if (!res.ok) {
174 const text = await res.text();
175 throw new Error(`HTTP ${res.status}: ${text}`);
176 }
177 return res.json() as Promise<T>;
178}
179
180// ---------------------------------------------------------------------------
181// Main
182// ---------------------------------------------------------------------------
183
184const RECORD_CEILING = 10;
185
186async function main() {
187 const args = process.argv.slice(2).filter((a) => !a.startsWith("--"));
188 const flags = new Set(process.argv.slice(2).filter((a) => a.startsWith("--")));
189 cleanDirs = flags.has("--clean");
190
191 if (args.length < 2) {
192 console.log("Usage: npx tsx scripts/real-bidir-test.ts <handle-a> <handle-b> [--clean]");
193 console.log(" --clean Remove data dirs after test (default: keep for fast re-runs)");
194 console.log("Example: npx tsx scripts/real-bidir-test.ts alice.bsky.social bob.bsky.social");
195 process.exit(1);
196 }
197
198 const [handleA, handleB] = args;
199 const portA = 3100;
200 const portB = 3101;
201
202 // 1. Stable data dirs (reused across runs to keep OAuth sessions)
203 tmpA = join(tmpdir(), "p2pds-smoke-a");
204 tmpB = join(tmpdir(), "p2pds-smoke-b");
205 mkdirSync(tmpA, { recursive: true });
206 mkdirSync(tmpB, { recursive: true });
207 log(`Data dirs: ${tmpA}, ${tmpB}`);
208
209 try {
210 // 2. Start servers with identity interception
211 const configA = makeConfig(tmpA, portA);
212 const configB = makeConfig(tmpB, portB);
213 const identityA = withIdentityPromise(configA);
214 const identityB = withIdentityPromise(configB);
215
216 log("Starting Node A on port 3100...");
217 serverA = await startServer(configA);
218 log("Starting Node B on port 3101...");
219 serverB = await startServer(configB);
220 ok("Both servers started");
221
222 const rmA = serverA.replicationManager;
223 const rmB = serverB.replicationManager;
224 const ipfsA = serverA.ipfsService;
225 const ipfsB = serverB.ipfsService;
226 if (!rmA || !rmB) fail("ReplicationManager not available on one or both servers");
227 if (!ipfsA || !ipfsB) fail("IpfsService not available on one or both servers");
228
229 // 3. Stop periodic sync — we control sync explicitly in this test
230 // Set stopped=true to block the initial 5s delayed syncAll() too
231 (rmA as any).stopped = true;
232 (rmB as any).stopped = true;
233 if ((rmA as any).syncTimer) { clearInterval((rmA as any).syncTimer); (rmA as any).syncTimer = null; }
234 if ((rmB as any).syncTimer) { clearInterval((rmB as any).syncTimer); (rmB as any).syncTimer = null; }
235
236 // Purge leftover cross-DIDs from previous runs
237 const selfDidA = configA.DID;
238 const selfDidB = configB.DID;
239 if (selfDidA) {
240 for (const did of rmA.getReplicateDids()) {
241 if (did !== selfDidA) { await rmA.removeDid(did, true); log(`Purged leftover ${did} from Node A`); }
242 }
243 }
244 if (selfDidB) {
245 for (const did of rmB.getReplicateDids()) {
246 if (did !== selfDidB) { await rmB.removeDid(did, true); log(`Purged leftover ${did} from Node B`); }
247 }
248 }
249
250 // 4. Set up syncDid interception BEFORE OAuth — captures self-sync + cross-sync promises
251 const captureA = interceptSyncDid(rmA);
252 const captureB = interceptSyncDid(rmB);
253
254 // 5. Skip blob sync — blocks-only is sufficient for smoke test
255 const noopBlobs = async () => ({ fetched: 0, skipped: 0, errors: 0, totalBytes: 0 });
256 (rmA as any).syncBlobs = noopBlobs;
257 (rmB as any).syncBlobs = noopBlobs;
258
259 // 5. Check if already authenticated (identity loaded from DB on startup)
260 const alreadyAuthedA = !!configA.DID;
261 const alreadyAuthedB = !!configB.DID;
262
263 if (alreadyAuthedA && alreadyAuthedB) {
264 ok(`Reusing sessions: A=${configA.DID} (@${configA.HANDLE ?? "?"}), B=${configB.DID} (@${configB.HANDLE ?? "?"})`);
265 } else {
266 const urlA = `http://localhost:${portA}/oauth/login?handle=${encodeURIComponent(handleA!)}`;
267 const urlB = `http://localhost:${portB}/oauth/login?handle=${encodeURIComponent(handleB!)}`;
268
269 console.log();
270 console.log("\x1b[1m=== Opening OAuth login in browser ===\x1b[0m");
271 console.log();
272 if (!alreadyAuthedA) console.log(` Node A (${handleA}): ${urlA}`);
273 if (!alreadyAuthedB) console.log(` Node B (${handleB}): ${urlB}`);
274 console.log();
275
276 try {
277 if (!alreadyAuthedA) execSync(`open ${JSON.stringify(urlA)}`);
278 if (!alreadyAuthedB) execSync(`open ${JSON.stringify(urlB)}`);
279 log("Opened login URLs in your browser. Please authenticate.");
280 } catch {
281 log("Could not auto-open browser. Please open the URLs above manually.");
282 }
283
284 log("Waiting for authentication (Ctrl+C to abort)...");
285 }
286
287 // 6. Await identity events (resolves immediately if already authed)
288 const [didA, didB] = await Promise.all([identityA, identityB]);
289
290 ok(`Node A: ${didA} (@${configA.HANDLE ?? "unknown"})`);
291 ok(`Node B: ${didB} (@${configB.HANDLE ?? "unknown"})`);
292
293 // 7. Self-sync: if fresh OAuth, wait for the auto-triggered sync.
294 // If session reused but data is missing (cleared), trigger sync explicitly.
295 const selfStateA = rmA.getSyncStorage().getState(didA);
296 const selfStateB = rmB.getSyncStorage().getState(didB);
297 const needSyncA = !selfStateA || selfStateA.status !== "synced";
298 const needSyncB = !selfStateB || selfStateB.status !== "synced";
299
300 if (needSyncA || needSyncB) {
301 log("Self-sync needed...");
302 const waits: Promise<void>[] = [];
303 if (needSyncA && alreadyAuthedA) {
304 // Session reused but data cleared — trigger self-sync explicitly
305 rmA.addDid(didA).catch(() => {});
306 }
307 if (needSyncB && alreadyAuthedB) {
308 rmB.addDid(didB).catch(() => {});
309 }
310 if (needSyncA) waits.push(captureA.awaitSync(didA));
311 if (needSyncB) waits.push(captureB.awaitSync(didB));
312 await Promise.all(waits);
313 ok("Self-sync complete on both nodes");
314 } else {
315 ok("Self-sync data already present — skipping");
316 }
317
318 // 8. Dial peers to establish libp2p connections (use local TCP, not relay)
319 const addrsA = ipfsA.getMultiaddrs();
320 const addrsB = ipfsB.getMultiaddrs();
321 const pickLocal = (addrs: typeof addrsA) =>
322 addrs.find((a) => {
323 const s = a.toString();
324 return s.includes("/ip4/127.0.0.1/tcp/") && !s.includes("/ws/") && !s.includes("/p2p-circuit/");
325 });
326 const localA = pickLocal(addrsA);
327 const localB = pickLocal(addrsB);
328 if (!localA || !localB) {
329 fail(`No local TCP addr (A: ${!!localA}, B: ${!!localB})`);
330 }
331 log(`Node A local: ${localA}`);
332 log(`Node B local: ${localB}`);
333
334 await Promise.all([
335 ipfsA.dial(localB),
336 ipfsB.dial(localA),
337 ]);
338 ok("Peer connections established via libp2p");
339
340 // 9. Add cross-DIDs — triggers syncDid (stopped=true only blocks syncAll, not syncDid)
341 log("Adding cross-DIDs and syncing via libp2p...");
342 await rmA.addDid(didB);
343 ok(`Node A now tracking ${didB}`);
344 await rmB.addDid(didA);
345 ok(`Node B now tracking ${didA}`);
346
347 // 10. Wait for cross-sync completion
348 await Promise.all([
349 captureA.awaitSync(didB),
350 captureB.awaitSync(didA),
351 ]);
352 ok("Cross-sync complete");
353
354 // 11. Verify cross-sync used libp2p (not HTTP PDS fallback)
355 const historyA = rmA.getSyncStorage().getSyncHistory(didB, 1);
356 const historyB = rmB.getSyncStorage().getSyncHistory(didA, 1);
357
358 if (historyA.length === 0) fail("No sync history on Node A for cross-sync");
359 if (historyB.length === 0) fail("No sync history on Node B for cross-sync");
360
361 const sourceA = historyA[0]!.sourceType;
362 const sourceB = historyB[0]!.sourceType;
363
364 if (sourceA !== "libp2p") {
365 fail(`Node A cross-sync used "${sourceA}" instead of "libp2p"`);
366 }
367 ok(`Node A cross-synced ${didB} via libp2p`);
368
369 if (sourceB !== "libp2p") {
370 fail(`Node B cross-sync used "${sourceB}" instead of "libp2p"`);
371 }
372 ok(`Node B cross-synced ${didA} via libp2p`);
373
374 // 12. Verify sync state
375 const syncStateA = rmA.getSyncStorage().getState(didB);
376 if (!syncStateA || syncStateA.status !== "synced") {
377 fail(`Node A sync status for ${didB}: ${syncStateA?.status ?? "missing"}`);
378 }
379 ok(`Node A synced ${didB}`);
380
381 const syncStateB = rmB.getSyncStorage().getState(didA);
382 if (!syncStateB || syncStateB.status !== "synced") {
383 fail(`Node B sync status for ${didA}: ${syncStateB?.status ?? "missing"}`);
384 }
385 ok(`Node B synced ${didA}`);
386
387 // 13. Verify cross-serving via HTTP reads (one-shot, not polling)
388 log("Verifying cross-serving...");
389
390 // Node A describes Node B's repo
391 const descA = await fetchJson<{ did: string; collections: string[] }>(
392 `http://localhost:${portA}/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(didB)}`,
393 );
394 if (descA.did !== didB) fail(`describeRepo DID mismatch: ${descA.did} !== ${didB}`);
395 ok(`Node A describes ${didB}: ${descA.collections.length} collections`);
396
397 // Node B describes Node A's repo
398 const descB = await fetchJson<{ did: string; collections: string[] }>(
399 `http://localhost:${portB}/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(didA)}`,
400 );
401 if (descB.did !== didA) fail(`describeRepo DID mismatch: ${descB.did} !== ${didA}`);
402 ok(`Node B describes ${didA}: ${descB.collections.length} collections`);
403
404 // Node A lists records from Node B (ceiling of 10 per collection, first 5 collections)
405 let totalRecsA = 0;
406 for (const coll of descA.collections.slice(0, 5)) {
407 const recs = await fetchJson<{ records: Array<{ uri: string }> }>(
408 `http://localhost:${portA}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(didB)}&collection=${encodeURIComponent(coll)}&limit=${RECORD_CEILING}`,
409 );
410 totalRecsA += recs.records.length;
411 if (recs.records.length > 0) {
412 log(` ${coll}: ${recs.records.length} records`);
413 }
414 }
415 if (totalRecsA === 0) fail("Node A served 0 records for Node B");
416 ok(`Node A serves ${totalRecsA} records for ${didB}`);
417
418 // Node B lists records from Node A (ceiling of 10 per collection, first 5 collections)
419 let totalRecsB = 0;
420 for (const coll of descB.collections.slice(0, 5)) {
421 const recs = await fetchJson<{ records: Array<{ uri: string }> }>(
422 `http://localhost:${portB}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(didA)}&collection=${encodeURIComponent(coll)}&limit=${RECORD_CEILING}`,
423 );
424 totalRecsB += recs.records.length;
425 if (recs.records.length > 0) {
426 log(` ${coll}: ${recs.records.length} records`);
427 }
428 }
429 if (totalRecsB === 0) fail("Node B served 0 records for Node A");
430 ok(`Node B serves ${totalRecsB} records for ${didA}`);
431
432 // Node A serves Node B's repo via getRepo
433 const getRepoA = await fetch(
434 `http://localhost:${portA}/xrpc/com.atproto.sync.getRepo?did=${encodeURIComponent(didB)}`,
435 );
436 if (getRepoA.status !== 200) fail(`getRepo failed: ${getRepoA.status}`);
437 const carA = new Uint8Array(await getRepoA.arrayBuffer());
438 ok(`Node A serves ${didB} repo CAR (${(carA.length / 1024).toFixed(1)} KB)`);
439
440 // Node B serves Node A's repo via getRepo
441 const getRepoB = await fetch(
442 `http://localhost:${portB}/xrpc/com.atproto.sync.getRepo?did=${encodeURIComponent(didA)}`,
443 );
444 if (getRepoB.status !== 200) fail(`getRepo failed: ${getRepoB.status}`);
445 const carB = new Uint8Array(await getRepoB.arrayBuffer());
446 ok(`Node B serves ${didA} repo CAR (${(carB.length / 1024).toFixed(1)} KB)`);
447
448 // 14. Summary
449 console.log();
450 console.log("\x1b[1m=== Summary ===\x1b[0m");
451 console.log(` Node A (@${configA.HANDLE ?? "?"}): verified ${totalRecsA} records served for @${configB.HANDLE ?? "?"}`);
452 console.log(` Node B (@${configB.HANDLE ?? "?"}): verified ${totalRecsB} records served for @${configA.HANDLE ?? "?"}`);
453 console.log(` Cross-sync transport: libp2p (both directions)`);
454 console.log();
455
456 // 15. Purge cross-sync data (keep self-sync for fast re-runs)
457 log("Purging cross-replicated data...");
458 await rmA.removeDid(didB, true);
459 ok(`Node A purged ${didB}`);
460 await rmB.removeDid(didA, true);
461 ok(`Node B purged ${didA}`);
462
463 // 16. Logout only if --clean (otherwise keep sessions for re-runs)
464 if (cleanDirs) {
465 log("Disconnecting identities...");
466 await fetch(`http://localhost:${portA}/oauth/logout?disconnect=true`, { method: "POST" });
467 ok("Node A disconnected");
468 await fetch(`http://localhost:${portB}/oauth/logout?disconnect=true`, { method: "POST" });
469 ok("Node B disconnected");
470 } else {
471 log("Keeping sessions for re-runs (use --clean to disconnect)");
472 }
473
474 console.log();
475 ok("All checks passed!");
476
477 } catch (err) {
478 const msg = err instanceof Error ? err.stack ?? err.message : String(err);
479 console.error(`\x1b[31m[FAIL]\x1b[0m ${msg}`);
480 } finally {
481 await cleanup();
482 }
483}
484
485main();