atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 485 lines 18 kB view raw
1/** 2 * Real OAuth Bidirectional Replication Smoke Test 3 * 4 * Starts two p2pds servers with IPFS networking enabled, guides user 5 * through OAuth in the browser, then automates: self-sync, peer dial, 6 * cross-sync via libp2p, verify cross-serving, cleanup. 7 * 8 * No polling or timeouts — uses property interception for OAuth identity 9 * events, promise capture for syncDid calls, and deterministic dial(). 10 * 11 * Usage: npx tsx scripts/real-bidir-test.ts alice.bsky.social bob.bsky.social [--clean] 12 * 13 * Sessions persist in stable data dirs — re-runs skip OAuth and self-sync. 14 * Use --clean to wipe data dirs and disconnect after the test. 15 */ 16 17import { mkdirSync, rmSync, existsSync } from "node:fs"; 18import { tmpdir } from "node:os"; 19import { join } from "node:path"; 20import { execSync } from "node:child_process"; 21import { startServer, type ServerHandle } from "../src/start.js"; 22import type { Config } from "../src/config.js"; 23import type { ReplicationManager } from "../src/replication/replication-manager.js"; 24import type { IpfsService } from "../src/ipfs.js"; 25 26// --------------------------------------------------------------------------- 27// Shared state for signal-safe cleanup 28// --------------------------------------------------------------------------- 29 30let serverA: ServerHandle | undefined; 31let serverB: ServerHandle | undefined; 32let tmpA: string | undefined; 33let tmpB: string | undefined; 34let cleanDirs = false; 35 36async function cleanup() { 37 log("Shutting down servers..."); 38 if (serverA) { await serverA.close().catch(() => {}); serverA = undefined; } 39 if (serverB) { await serverB.close().catch(() => {}); serverB = undefined; } 40 if (cleanDirs) { 41 if (tmpA) { log("Cleaning up data dirs..."); rmSync(tmpA, { recursive: true, force: true }); tmpA = undefined; } 42 if (tmpB) { rmSync(tmpB, { recursive: true, force: true }); tmpB = undefined; } 43 } 44 45 // Brief settle for async gossipsub teardown 46 await new Promise((r) => setTimeout(r, 500)); 47 log("Done."); 48} 49 50// Suppress gossipsub StreamStateError (async background streams during peer connect/disconnect) 51process.on("uncaughtException", (err) => { 52 if (err?.constructor?.name === "StreamStateError") return; 53 console.error("Uncaught:", err); 54 cleanup().finally(() => process.exit(1)); 55}); 56 57// Handle SIGINT/SIGTERM for clean shutdown when killed 58for (const sig of ["SIGINT", "SIGTERM"] as const) { 59 process.on(sig, () => { 60 log(`Caught ${sig}, cleaning up...`); 61 cleanup().finally(() => process.exit(1)); 62 }); 63} 64 65// --------------------------------------------------------------------------- 66// Helpers 67// --------------------------------------------------------------------------- 68 69function log(msg: string) { 70 console.log(`\x1b[36m[test]\x1b[0m ${msg}`); 71} 72 73function fail(msg: string): never { 74 console.error(`\x1b[31m[FAIL]\x1b[0m ${msg}`); 75 cleanup().finally(() => process.exit(1)); 76 throw new Error("unreachable"); 77} 78 79function ok(msg: string) { 80 console.log(`\x1b[32m[ OK ]\x1b[0m ${msg}`); 81} 82 83/** 84 * Intercept config.DID setter so we get an event-driven promise 85 * that resolves when the OAuth callback establishes identity. 86 * No polling — the setter fires synchronously inside the callback. 87 */ 88function withIdentityPromise(config: Config): Promise<string> { 89 let _did = config.DID; 90 let _resolve: ((did: string) => void) | undefined; 91 92 const promise = new Promise<string>((resolve) => { 93 if (_did) { resolve(_did); return; } 94 _resolve = resolve; 95 }); 96 97 Object.defineProperty(config, "DID", { 98 get: () => _did, 99 set: (v: string | undefined) => { 100 _did = v; 101 if (v && _resolve) { 102 _resolve(v); 103 _resolve = undefined; 104 } 105 }, 106 enumerable: true, 107 configurable: true, 108 }); 109 110 return promise; 111} 112 113/** 114 * Intercept syncDid on a ReplicationManager to capture per-DID promises. 115 * 116 * Returns an `awaitSync(did)` function that: 117 * - Returns the stored promise if syncDid was already called for that DID 118 * - Otherwise waits (event-driven, no polling) until syncDid is called 119 */ 120function interceptSyncDid(rm: ReplicationManager) { 121 const captured = new Map<string, Promise<void>>(); 122 const waiters = new Map<string, (syncPromise: Promise<void>) => void>(); 123 124 const origSyncDid = rm.syncDid.bind(rm); 125 (rm as any).syncDid = (did: string) => { 126 const p = origSyncDid(did); 127 captured.set(did, p); 128 const waiter = waiters.get(did); 129 if (waiter) { 130 waiter(p); 131 waiters.delete(did); 132 } 133 return p; 134 }; 135 136 return { 137 awaitSync(did: string): Promise<void> { 138 if (captured.has(did)) return captured.get(did)!; 139 return new Promise<void>((resolve, reject) => { 140 waiters.set(did, (p) => p.then(resolve, reject)); 141 }); 142 }, 143 }; 144} 145 146function makeConfig(dataDir: string, port: number): Config { 147 return { 148 PDS_HOSTNAME: `localhost:${port}`, 149 AUTH_TOKEN: "smoke-test-token", 150 JWT_SECRET: "smoke-jwt-secret", 151 PASSWORD_HASH: "$2a$10$test", 152 DATA_DIR: dataDir, 153 PORT: port, 154 IPFS_ENABLED: true, 155 IPFS_NETWORKING: true, 156 REPLICATE_DIDS: [], 157 FIREHOSE_URL: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", 158 FIREHOSE_ENABLED: false, 159 RATE_LIMIT_ENABLED: false, 160 RATE_LIMIT_READ_PER_MIN: 300, 161 RATE_LIMIT_SYNC_PER_MIN: 30, 162 RATE_LIMIT_SESSION_PER_MIN: 10, 163 RATE_LIMIT_WRITE_PER_MIN: 200, 164 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 165 RATE_LIMIT_MAX_CONNECTIONS: 100, 166 RATE_LIMIT_FIREHOSE_PER_IP: 3, 167 OAUTH_ENABLED: true, 168 }; 169} 170 171async function fetchJson<T>(url: string, opts?: RequestInit): Promise<T> { 172 const res = await fetch(url, opts); 173 if (!res.ok) { 174 const text = await res.text(); 175 throw new Error(`HTTP ${res.status}: ${text}`); 176 } 177 return res.json() as Promise<T>; 178} 179 180// --------------------------------------------------------------------------- 181// Main 182// --------------------------------------------------------------------------- 183 184const RECORD_CEILING = 10; 185 186async function main() { 187 const args = process.argv.slice(2).filter((a) => !a.startsWith("--")); 188 const flags = new Set(process.argv.slice(2).filter((a) => a.startsWith("--"))); 189 cleanDirs = flags.has("--clean"); 190 191 if (args.length < 2) { 192 console.log("Usage: npx tsx scripts/real-bidir-test.ts <handle-a> <handle-b> [--clean]"); 193 console.log(" --clean Remove data dirs after test (default: keep for fast re-runs)"); 194 console.log("Example: npx tsx scripts/real-bidir-test.ts alice.bsky.social bob.bsky.social"); 195 process.exit(1); 196 } 197 198 const [handleA, handleB] = args; 199 const portA = 3100; 200 const portB = 3101; 201 202 // 1. Stable data dirs (reused across runs to keep OAuth sessions) 203 tmpA = join(tmpdir(), "p2pds-smoke-a"); 204 tmpB = join(tmpdir(), "p2pds-smoke-b"); 205 mkdirSync(tmpA, { recursive: true }); 206 mkdirSync(tmpB, { recursive: true }); 207 log(`Data dirs: ${tmpA}, ${tmpB}`); 208 209 try { 210 // 2. Start servers with identity interception 211 const configA = makeConfig(tmpA, portA); 212 const configB = makeConfig(tmpB, portB); 213 const identityA = withIdentityPromise(configA); 214 const identityB = withIdentityPromise(configB); 215 216 log("Starting Node A on port 3100..."); 217 serverA = await startServer(configA); 218 log("Starting Node B on port 3101..."); 219 serverB = await startServer(configB); 220 ok("Both servers started"); 221 222 const rmA = serverA.replicationManager; 223 const rmB = serverB.replicationManager; 224 const ipfsA = serverA.ipfsService; 225 const ipfsB = serverB.ipfsService; 226 if (!rmA || !rmB) fail("ReplicationManager not available on one or both servers"); 227 if (!ipfsA || !ipfsB) fail("IpfsService not available on one or both servers"); 228 229 // 3. Stop periodic sync — we control sync explicitly in this test 230 // Set stopped=true to block the initial 5s delayed syncAll() too 231 (rmA as any).stopped = true; 232 (rmB as any).stopped = true; 233 if ((rmA as any).syncTimer) { clearInterval((rmA as any).syncTimer); (rmA as any).syncTimer = null; } 234 if ((rmB as any).syncTimer) { clearInterval((rmB as any).syncTimer); (rmB as any).syncTimer = null; } 235 236 // Purge leftover cross-DIDs from previous runs 237 const selfDidA = configA.DID; 238 const selfDidB = configB.DID; 239 if (selfDidA) { 240 for (const did of rmA.getReplicateDids()) { 241 if (did !== selfDidA) { await rmA.removeDid(did, true); log(`Purged leftover ${did} from Node A`); } 242 } 243 } 244 if (selfDidB) { 245 for (const did of rmB.getReplicateDids()) { 246 if (did !== selfDidB) { await rmB.removeDid(did, true); log(`Purged leftover ${did} from Node B`); } 247 } 248 } 249 250 // 4. Set up syncDid interception BEFORE OAuth — captures self-sync + cross-sync promises 251 const captureA = interceptSyncDid(rmA); 252 const captureB = interceptSyncDid(rmB); 253 254 // 5. Skip blob sync — blocks-only is sufficient for smoke test 255 const noopBlobs = async () => ({ fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }); 256 (rmA as any).syncBlobs = noopBlobs; 257 (rmB as any).syncBlobs = noopBlobs; 258 259 // 5. Check if already authenticated (identity loaded from DB on startup) 260 const alreadyAuthedA = !!configA.DID; 261 const alreadyAuthedB = !!configB.DID; 262 263 if (alreadyAuthedA && alreadyAuthedB) { 264 ok(`Reusing sessions: A=${configA.DID} (@${configA.HANDLE ?? "?"}), B=${configB.DID} (@${configB.HANDLE ?? "?"})`); 265 } else { 266 const urlA = `http://localhost:${portA}/oauth/login?handle=${encodeURIComponent(handleA!)}`; 267 const urlB = `http://localhost:${portB}/oauth/login?handle=${encodeURIComponent(handleB!)}`; 268 269 console.log(); 270 console.log("\x1b[1m=== Opening OAuth login in browser ===\x1b[0m"); 271 console.log(); 272 if (!alreadyAuthedA) console.log(` Node A (${handleA}): ${urlA}`); 273 if (!alreadyAuthedB) console.log(` Node B (${handleB}): ${urlB}`); 274 console.log(); 275 276 try { 277 if (!alreadyAuthedA) execSync(`open ${JSON.stringify(urlA)}`); 278 if (!alreadyAuthedB) execSync(`open ${JSON.stringify(urlB)}`); 279 log("Opened login URLs in your browser. Please authenticate."); 280 } catch { 281 log("Could not auto-open browser. Please open the URLs above manually."); 282 } 283 284 log("Waiting for authentication (Ctrl+C to abort)..."); 285 } 286 287 // 6. Await identity events (resolves immediately if already authed) 288 const [didA, didB] = await Promise.all([identityA, identityB]); 289 290 ok(`Node A: ${didA} (@${configA.HANDLE ?? "unknown"})`); 291 ok(`Node B: ${didB} (@${configB.HANDLE ?? "unknown"})`); 292 293 // 7. Self-sync: if fresh OAuth, wait for the auto-triggered sync. 294 // If session reused but data is missing (cleared), trigger sync explicitly. 295 const selfStateA = rmA.getSyncStorage().getState(didA); 296 const selfStateB = rmB.getSyncStorage().getState(didB); 297 const needSyncA = !selfStateA || selfStateA.status !== "synced"; 298 const needSyncB = !selfStateB || selfStateB.status !== "synced"; 299 300 if (needSyncA || needSyncB) { 301 log("Self-sync needed..."); 302 const waits: Promise<void>[] = []; 303 if (needSyncA && alreadyAuthedA) { 304 // Session reused but data cleared — trigger self-sync explicitly 305 rmA.addDid(didA).catch(() => {}); 306 } 307 if (needSyncB && alreadyAuthedB) { 308 rmB.addDid(didB).catch(() => {}); 309 } 310 if (needSyncA) waits.push(captureA.awaitSync(didA)); 311 if (needSyncB) waits.push(captureB.awaitSync(didB)); 312 await Promise.all(waits); 313 ok("Self-sync complete on both nodes"); 314 } else { 315 ok("Self-sync data already present — skipping"); 316 } 317 318 // 8. Dial peers to establish libp2p connections (use local TCP, not relay) 319 const addrsA = ipfsA.getMultiaddrs(); 320 const addrsB = ipfsB.getMultiaddrs(); 321 const pickLocal = (addrs: typeof addrsA) => 322 addrs.find((a) => { 323 const s = a.toString(); 324 return s.includes("/ip4/127.0.0.1/tcp/") && !s.includes("/ws/") && !s.includes("/p2p-circuit/"); 325 }); 326 const localA = pickLocal(addrsA); 327 const localB = pickLocal(addrsB); 328 if (!localA || !localB) { 329 fail(`No local TCP addr (A: ${!!localA}, B: ${!!localB})`); 330 } 331 log(`Node A local: ${localA}`); 332 log(`Node B local: ${localB}`); 333 334 await Promise.all([ 335 ipfsA.dial(localB), 336 ipfsB.dial(localA), 337 ]); 338 ok("Peer connections established via libp2p"); 339 340 // 9. Add cross-DIDs — triggers syncDid (stopped=true only blocks syncAll, not syncDid) 341 log("Adding cross-DIDs and syncing via libp2p..."); 342 await rmA.addDid(didB); 343 ok(`Node A now tracking ${didB}`); 344 await rmB.addDid(didA); 345 ok(`Node B now tracking ${didA}`); 346 347 // 10. Wait for cross-sync completion 348 await Promise.all([ 349 captureA.awaitSync(didB), 350 captureB.awaitSync(didA), 351 ]); 352 ok("Cross-sync complete"); 353 354 // 11. Verify cross-sync used libp2p (not HTTP PDS fallback) 355 const historyA = rmA.getSyncStorage().getSyncHistory(didB, 1); 356 const historyB = rmB.getSyncStorage().getSyncHistory(didA, 1); 357 358 if (historyA.length === 0) fail("No sync history on Node A for cross-sync"); 359 if (historyB.length === 0) fail("No sync history on Node B for cross-sync"); 360 361 const sourceA = historyA[0]!.sourceType; 362 const sourceB = historyB[0]!.sourceType; 363 364 if (sourceA !== "libp2p") { 365 fail(`Node A cross-sync used "${sourceA}" instead of "libp2p"`); 366 } 367 ok(`Node A cross-synced ${didB} via libp2p`); 368 369 if (sourceB !== "libp2p") { 370 fail(`Node B cross-sync used "${sourceB}" instead of "libp2p"`); 371 } 372 ok(`Node B cross-synced ${didA} via libp2p`); 373 374 // 12. Verify sync state 375 const syncStateA = rmA.getSyncStorage().getState(didB); 376 if (!syncStateA || syncStateA.status !== "synced") { 377 fail(`Node A sync status for ${didB}: ${syncStateA?.status ?? "missing"}`); 378 } 379 ok(`Node A synced ${didB}`); 380 381 const syncStateB = rmB.getSyncStorage().getState(didA); 382 if (!syncStateB || syncStateB.status !== "synced") { 383 fail(`Node B sync status for ${didA}: ${syncStateB?.status ?? "missing"}`); 384 } 385 ok(`Node B synced ${didA}`); 386 387 // 13. Verify cross-serving via HTTP reads (one-shot, not polling) 388 log("Verifying cross-serving..."); 389 390 // Node A describes Node B's repo 391 const descA = await fetchJson<{ did: string; collections: string[] }>( 392 `http://localhost:${portA}/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(didB)}`, 393 ); 394 if (descA.did !== didB) fail(`describeRepo DID mismatch: ${descA.did} !== ${didB}`); 395 ok(`Node A describes ${didB}: ${descA.collections.length} collections`); 396 397 // Node B describes Node A's repo 398 const descB = await fetchJson<{ did: string; collections: string[] }>( 399 `http://localhost:${portB}/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(didA)}`, 400 ); 401 if (descB.did !== didA) fail(`describeRepo DID mismatch: ${descB.did} !== ${didA}`); 402 ok(`Node B describes ${didA}: ${descB.collections.length} collections`); 403 404 // Node A lists records from Node B (ceiling of 10 per collection, first 5 collections) 405 let totalRecsA = 0; 406 for (const coll of descA.collections.slice(0, 5)) { 407 const recs = await fetchJson<{ records: Array<{ uri: string }> }>( 408 `http://localhost:${portA}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(didB)}&collection=${encodeURIComponent(coll)}&limit=${RECORD_CEILING}`, 409 ); 410 totalRecsA += recs.records.length; 411 if (recs.records.length > 0) { 412 log(` ${coll}: ${recs.records.length} records`); 413 } 414 } 415 if (totalRecsA === 0) fail("Node A served 0 records for Node B"); 416 ok(`Node A serves ${totalRecsA} records for ${didB}`); 417 418 // Node B lists records from Node A (ceiling of 10 per collection, first 5 collections) 419 let totalRecsB = 0; 420 for (const coll of descB.collections.slice(0, 5)) { 421 const recs = await fetchJson<{ records: Array<{ uri: string }> }>( 422 `http://localhost:${portB}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(didA)}&collection=${encodeURIComponent(coll)}&limit=${RECORD_CEILING}`, 423 ); 424 totalRecsB += recs.records.length; 425 if (recs.records.length > 0) { 426 log(` ${coll}: ${recs.records.length} records`); 427 } 428 } 429 if (totalRecsB === 0) fail("Node B served 0 records for Node A"); 430 ok(`Node B serves ${totalRecsB} records for ${didA}`); 431 432 // Node A serves Node B's repo via getRepo 433 const getRepoA = await fetch( 434 `http://localhost:${portA}/xrpc/com.atproto.sync.getRepo?did=${encodeURIComponent(didB)}`, 435 ); 436 if (getRepoA.status !== 200) fail(`getRepo failed: ${getRepoA.status}`); 437 const carA = new Uint8Array(await getRepoA.arrayBuffer()); 438 ok(`Node A serves ${didB} repo CAR (${(carA.length / 1024).toFixed(1)} KB)`); 439 440 // Node B serves Node A's repo via getRepo 441 const getRepoB = await fetch( 442 `http://localhost:${portB}/xrpc/com.atproto.sync.getRepo?did=${encodeURIComponent(didA)}`, 443 ); 444 if (getRepoB.status !== 200) fail(`getRepo failed: ${getRepoB.status}`); 445 const carB = new Uint8Array(await getRepoB.arrayBuffer()); 446 ok(`Node B serves ${didA} repo CAR (${(carB.length / 1024).toFixed(1)} KB)`); 447 448 // 14. Summary 449 console.log(); 450 console.log("\x1b[1m=== Summary ===\x1b[0m"); 451 console.log(` Node A (@${configA.HANDLE ?? "?"}): verified ${totalRecsA} records served for @${configB.HANDLE ?? "?"}`); 452 console.log(` Node B (@${configB.HANDLE ?? "?"}): verified ${totalRecsB} records served for @${configA.HANDLE ?? "?"}`); 453 console.log(` Cross-sync transport: libp2p (both directions)`); 454 console.log(); 455 456 // 15. Purge cross-sync data (keep self-sync for fast re-runs) 457 log("Purging cross-replicated data..."); 458 await rmA.removeDid(didB, true); 459 ok(`Node A purged ${didB}`); 460 await rmB.removeDid(didA, true); 461 ok(`Node B purged ${didA}`); 462 463 // 16. Logout only if --clean (otherwise keep sessions for re-runs) 464 if (cleanDirs) { 465 log("Disconnecting identities..."); 466 await fetch(`http://localhost:${portA}/oauth/logout?disconnect=true`, { method: "POST" }); 467 ok("Node A disconnected"); 468 await fetch(`http://localhost:${portB}/oauth/logout?disconnect=true`, { method: "POST" }); 469 ok("Node B disconnected"); 470 } else { 471 log("Keeping sessions for re-runs (use --clean to disconnect)"); 472 } 473 474 console.log(); 475 ok("All checks passed!"); 476 477 } catch (err) { 478 const msg = err instanceof Error ? err.stack ?? err.message : String(err); 479 console.error(`\x1b[31m[FAIL]\x1b[0m ${msg}`); 480 } finally { 481 await cleanup(); 482 } 483} 484 485main();