atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add capstone E2E test: full bidirectional libp2p replication flow

Exercises the complete on-protocol loop with real networking: two
startServer() instances, self-sync, peer discovery via mock PDS
records, cross-sync via CAR-over-libp2p, XRPC serving verification,
incremental re-sync, and mutual offers generating auto-policies.

+587
+587
src/capstone-e2e.test.ts
··· 1 + /** 2 + * Capstone E2E test: full on-protocol bidirectional replication. 3 + * 4 + * Two startServer() instances with IPFS_NETWORKING: true, real libp2p, 5 + * CAR-over-libp2p protocol. Exercises the complete loop: 6 + * self-sync -> peer dial -> cross-sync via libp2p -> XRPC serving -> 7 + * incremental re-sync -> mutual offers -> auto-policies 8 + */ 9 + 10 + import { describe, it, expect, afterEach } from "vitest"; 11 + import { mkdtempSync, rmSync } from "node:fs"; 12 + import { tmpdir } from "node:os"; 13 + import { join, resolve } from "node:path"; 14 + import Database from "better-sqlite3"; 15 + 16 + import type { Config } from "./config.js"; 17 + import { startServer, type ServerHandle } from "./start.js"; 18 + import { 19 + createTestRepo, 20 + createTestRepoWithUpdate, 21 + } from "./replication/test-helpers.js"; 22 + import type { DidResolver, DidDocument } from "./did-resolver.js"; 23 + import { PolicyEngine } from "./policy/engine.js"; 24 + import { OFFER_NSID, didToRkey } from "./replication/types.js"; 25 + 26 + const DID_A = "did:plc:capstone-alice"; 27 + const DID_B = "did:plc:capstone-bob"; 28 + 29 + function baseConfig(dataDir: string): Config { 30 + return { 31 + PDS_HOSTNAME: "local.test", 32 + AUTH_TOKEN: "test-auth-token", 33 + JWT_SECRET: "test-jwt-secret", 34 + PASSWORD_HASH: "$2a$10$test", 35 + DATA_DIR: dataDir, 36 + PORT: 0, 37 + IPFS_ENABLED: true, 38 + IPFS_NETWORKING: true, 39 + REPLICATE_DIDS: [], 40 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 41 + FIREHOSE_ENABLED: false, 42 + RATE_LIMIT_ENABLED: false, 43 + RATE_LIMIT_READ_PER_MIN: 300, 44 + RATE_LIMIT_SYNC_PER_MIN: 30, 45 + RATE_LIMIT_SESSION_PER_MIN: 10, 46 + RATE_LIMIT_WRITE_PER_MIN: 200, 47 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 48 + RATE_LIMIT_MAX_CONNECTIONS: 100, 49 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 + OAUTH_ENABLED: false, 51 + }; 52 + } 53 + 54 + // ---- Enhanced mock PDS ---- 55 + 56 + interface EnhancedMockPds { 57 + url: string; 58 + port: number; 59 + close: () => Promise<void>; 60 + updateAccount: (did: string, carBytes: Uint8Array) => void; 61 + addRecord: (did: string, collection: string, rkey: string, value: unknown) => void; 62 + } 63 + 64 + async function startEnhancedMockPds( 65 + accounts: Array<{ did: string; carBytes: Uint8Array }>, 66 + ): Promise<EnhancedMockPds> { 67 + const { createServer } = await import("node:http"); 68 + 69 + const accountMap = new Map<string, Uint8Array>(); 70 + for (const a of accounts) { 71 + accountMap.set(a.did, a.carBytes); 72 + } 73 + 74 + // Records: did -> collection -> rkey -> { uri, cid, value } 75 + const records = new Map< 76 + string, 77 + Map<string, Map<string, { uri: string; cid: string; value: unknown }>> 78 + >(); 79 + 80 + const server = createServer((req, res) => { 81 + const url = new URL(req.url ?? "/", "http://localhost"); 82 + const pathname = url.pathname; 83 + 84 + if (pathname === "/xrpc/com.atproto.sync.getRepo") { 85 + const did = url.searchParams.get("did"); 86 + if (!did || !accountMap.has(did)) { 87 + res.writeHead(404, { "Content-Type": "application/json" }); 88 + res.end(JSON.stringify({ error: "RepoNotFound" })); 89 + return; 90 + } 91 + const carBytes = accountMap.get(did)!; 92 + res.writeHead(200, { 93 + "Content-Type": "application/vnd.ipld.car", 94 + "Content-Length": String(carBytes.length), 95 + }); 96 + res.end(Buffer.from(carBytes)); 97 + return; 98 + } 99 + 100 + if (pathname === "/xrpc/com.atproto.repo.listRecords") { 101 + const did = url.searchParams.get("repo"); 102 + const collection = url.searchParams.get("collection"); 103 + if (!did || !collection) { 104 + res.writeHead(400, { "Content-Type": "application/json" }); 105 + res.end(JSON.stringify({ error: "InvalidRequest" })); 106 + return; 107 + } 108 + const didRecords = records.get(did)?.get(collection); 109 + const recordList = didRecords ? Array.from(didRecords.values()) : []; 110 + res.writeHead(200, { "Content-Type": "application/json" }); 111 + res.end(JSON.stringify({ records: recordList })); 112 + return; 113 + } 114 + 115 + if (pathname === "/xrpc/com.atproto.repo.getRecord") { 116 + const did = url.searchParams.get("repo"); 117 + const collection = url.searchParams.get("collection"); 118 + const rkey = url.searchParams.get("rkey"); 119 + if (!did || !collection || !rkey) { 120 + res.writeHead(400, { "Content-Type": "application/json" }); 121 + res.end(JSON.stringify({ error: "InvalidRequest" })); 122 + return; 123 + } 124 + const record = records.get(did)?.get(collection)?.get(rkey); 125 + if (!record) { 126 + res.writeHead(404, { "Content-Type": "application/json" }); 127 + res.end(JSON.stringify({ error: "RecordNotFound" })); 128 + return; 129 + } 130 + res.writeHead(200, { "Content-Type": "application/json" }); 131 + res.end(JSON.stringify(record)); 132 + return; 133 + } 134 + 135 + res.writeHead(404, { "Content-Type": "application/json" }); 136 + res.end(JSON.stringify({ error: "NotFound" })); 137 + }); 138 + 139 + return new Promise((resolvePromise) => { 140 + server.listen(0, "127.0.0.1", () => { 141 + const addr = server.address() as { port: number }; 142 + const pdsUrl = `http://127.0.0.1:${addr.port}`; 143 + resolvePromise({ 144 + url: pdsUrl, 145 + port: addr.port, 146 + close: () => new Promise<void>((res) => server.close(() => res())), 147 + updateAccount: (did: string, carBytes: Uint8Array) => { 148 + accountMap.set(did, carBytes); 149 + }, 150 + addRecord: (did: string, collection: string, rkey: string, value: unknown) => { 151 + if (!records.has(did)) records.set(did, new Map()); 152 + const didMap = records.get(did)!; 153 + if (!didMap.has(collection)) didMap.set(collection, new Map()); 154 + didMap.get(collection)!.set(rkey, { 155 + uri: `at://${did}/${collection}/${rkey}`, 156 + cid: "bafytest", 157 + value, 158 + }); 159 + }, 160 + }); 161 + }); 162 + }); 163 + } 164 + 165 + function createMockDidResolver(mapping: Record<string, string>): DidResolver { 166 + return { 167 + resolve: async (did: string): Promise<DidDocument | null> => { 168 + const pdsUrl = mapping[did]; 169 + if (!pdsUrl) return null; 170 + return { 171 + id: did, 172 + service: [ 173 + { 174 + id: "#atproto_pds", 175 + type: "AtprotoPersonalDataServer", 176 + serviceEndpoint: pdsUrl, 177 + }, 178 + ], 179 + } as unknown as DidDocument; 180 + }, 181 + } as DidResolver; 182 + } 183 + 184 + function createMockRecordWriter(did: string, pds: EnhancedMockPds) { 185 + return { 186 + putRecord: async (collection: string, rkey: string, record: unknown) => { 187 + pds.addRecord(did, collection, rkey, record); 188 + return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" }; 189 + }, 190 + deleteRecord: async (_collection: string, _rkey: string) => {}, 191 + listRecords: async (collection: string, _opts: { limit: number }) => { 192 + const res = await fetch( 193 + `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`, 194 + ); 195 + if (!res.ok) return { records: [] }; 196 + return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> }; 197 + }, 198 + }; 199 + } 200 + 201 + /** 202 + * Wait for a condition to become true, with a timeout. 203 + */ 204 + async function waitFor( 205 + fn: () => Promise<boolean> | boolean, 206 + timeoutMs: number = 10_000, 207 + intervalMs: number = 200, 208 + ): Promise<void> { 209 + const deadline = Date.now() + timeoutMs; 210 + while (Date.now() < deadline) { 211 + if (await fn()) return; 212 + await new Promise((r) => setTimeout(r, intervalMs)); 213 + } 214 + throw new Error(`waitFor timed out after ${timeoutMs}ms`); 215 + } 216 + 217 + /** 218 + * Pre-create identity in the SQLite database (simulates OAuth already done). 219 + */ 220 + function presetIdentity(dataDir: string, did: string, handle: string): void { 221 + const db = new Database(resolve(dataDir, "pds.db")); 222 + db.pragma("journal_mode = WAL"); 223 + db.exec( 224 + "CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))", 225 + ); 226 + db.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(did, handle); 227 + db.close(); 228 + } 229 + 230 + // ---- Test suite ---- 231 + 232 + describe("Capstone E2E: on-protocol bidirectional replication", () => { 233 + let tmpDirA: string; 234 + let tmpDirB: string; 235 + let handleA: ServerHandle | undefined; 236 + let handleB: ServerHandle | undefined; 237 + let mockPds: EnhancedMockPds | undefined; 238 + 239 + afterEach(async () => { 240 + const closes: Promise<void>[] = []; 241 + if (handleA) closes.push(handleA.close().catch(() => {})); 242 + if (handleB) closes.push(handleB.close().catch(() => {})); 243 + await Promise.all(closes); 244 + handleA = undefined; 245 + handleB = undefined; 246 + 247 + if (mockPds) { await mockPds.close().catch(() => {}); mockPds = undefined; } 248 + if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); 249 + if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); 250 + }); 251 + 252 + it("full flow: self-sync, libp2p cross-sync, XRPC serve, incremental re-sync, mutual offers", async () => { 253 + // ================================================================ 254 + // Step 1: Setup — create repos, mock PDS, start two servers 255 + // ================================================================ 256 + tmpDirA = mkdtempSync(join(tmpdir(), "capstone-a-")); 257 + tmpDirB = mkdtempSync(join(tmpdir(), "capstone-b-")); 258 + 259 + // Create repos with update support for incremental sync 260 + const { initialCar: aliceInitialCar, fullCar: aliceFullCar } = await createTestRepoWithUpdate( 261 + DID_A, 262 + [ 263 + { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } }, 264 + { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, 265 + ], 266 + [ 267 + { collection: "app.bsky.feed.post", rkey: "a3", record: { text: "Alice post 3 (update)", createdAt: new Date().toISOString() } }, 268 + ], 269 + ); 270 + const { initialCar: bobInitialCar, fullCar: bobFullCar } = await createTestRepoWithUpdate( 271 + DID_B, 272 + [ 273 + { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } }, 274 + ], 275 + [ 276 + { collection: "app.bsky.feed.post", rkey: "b2", record: { text: "Bob post 2 (update)", createdAt: new Date().toISOString() } }, 277 + ], 278 + ); 279 + 280 + // Start mock PDS with initial CARs 281 + mockPds = await startEnhancedMockPds([ 282 + { did: DID_A, carBytes: aliceInitialCar }, 283 + { did: DID_B, carBytes: bobInitialCar }, 284 + ]); 285 + 286 + // Pre-inject offer records so they're available for discovery later 287 + mockPds.addRecord(DID_A, OFFER_NSID, didToRkey(DID_B), { 288 + $type: OFFER_NSID, 289 + subject: DID_B, 290 + minCopies: 2, 291 + intervalSec: 300, 292 + priority: 50, 293 + createdAt: new Date().toISOString(), 294 + }); 295 + mockPds.addRecord(DID_B, OFFER_NSID, didToRkey(DID_A), { 296 + $type: OFFER_NSID, 297 + subject: DID_A, 298 + minCopies: 3, 299 + intervalSec: 600, 300 + priority: 75, 301 + createdAt: new Date().toISOString(), 302 + }); 303 + 304 + const resolver = createMockDidResolver({ 305 + [DID_A]: mockPds.url, 306 + [DID_B]: mockPds.url, 307 + }); 308 + 309 + // Pre-set identities 310 + const configA = baseConfig(tmpDirA); 311 + configA.DID = DID_A; 312 + configA.HANDLE = "alice.test"; 313 + presetIdentity(tmpDirA, DID_A, "alice.test"); 314 + 315 + const configB = baseConfig(tmpDirB); 316 + configB.DID = DID_B; 317 + configB.HANDLE = "bob.test"; 318 + presetIdentity(tmpDirB, DID_B, "bob.test"); 319 + 320 + // Start both servers with real networking 321 + handleA = await startServer(configA, { didResolver: resolver }); 322 + handleB = await startServer(configB, { didResolver: resolver }); 323 + 324 + expect(handleA.replicationManager).toBeDefined(); 325 + expect(handleB.replicationManager).toBeDefined(); 326 + expect(handleA.ipfsService).toBeDefined(); 327 + expect(handleB.ipfsService).toBeDefined(); 328 + 329 + const rmA = handleA.replicationManager!; 330 + const rmB = handleB.replicationManager!; 331 + const ipfsA = handleA.ipfsService!; 332 + const ipfsB = handleB.ipfsService!; 333 + 334 + // Prevent periodic sync from interfering — set stopped flag directly 335 + // (don't call stop() which would unsubscribe topics) 336 + (rmA as unknown as { stopped: boolean }).stopped = true; 337 + (rmB as unknown as { stopped: boolean }).stopped = true; 338 + const syncTimerA = (rmA as unknown as { syncTimer: ReturnType<typeof setInterval> | null }).syncTimer; 339 + if (syncTimerA) { clearInterval(syncTimerA); (rmA as unknown as { syncTimer: null }).syncTimer = null; } 340 + const syncTimerB = (rmB as unknown as { syncTimer: ReturnType<typeof setInterval> | null }).syncTimer; 341 + if (syncTimerB) { clearInterval(syncTimerB); (rmB as unknown as { syncTimer: null }).syncTimer = null; } 342 + 343 + // Verify both nodes have peer IDs (networking is on) 344 + const peerIdA = ipfsA.getPeerId()!; 345 + const peerIdB = ipfsB.getPeerId()!; 346 + expect(peerIdA).toBeTruthy(); 347 + expect(peerIdB).toBeTruthy(); 348 + expect(peerIdA).not.toBe(peerIdB); 349 + 350 + const addrsA = ipfsA.getMultiaddrs(); 351 + const addrsB = ipfsB.getMultiaddrs(); 352 + expect(addrsA.length).toBeGreaterThan(0); 353 + expect(addrsB.length).toBeGreaterThan(0); 354 + 355 + // ================================================================ 356 + // Step 2: Self-sync — each node syncs its own DID 357 + // ================================================================ 358 + await rmA.addDid(DID_A); 359 + await waitFor(() => { 360 + const state = rmA.getSyncStorage().getState(DID_A); 361 + return state?.status === "synced"; 362 + }, 15_000); 363 + 364 + await rmB.addDid(DID_B); 365 + await waitFor(() => { 366 + const state = rmB.getSyncStorage().getState(DID_B); 367 + return state?.status === "synced"; 368 + }, 15_000); 369 + 370 + // Verify self-sync completed 371 + expect(rmA.getSyncStorage().getState(DID_A)?.status).toBe("synced"); 372 + expect(rmB.getSyncStorage().getState(DID_B)?.status).toBe("synced"); 373 + 374 + // ================================================================ 375 + // Step 3: Peer dial + peer record injection 376 + // ================================================================ 377 + // Add org.p2pds.peer/self records to mock PDS so discoverPeer() 378 + // finds the real peer info during sync. Each DID's peer record 379 + // contains the multiaddrs of the node that hosts that DID. 380 + const PEER_NSID = "org.p2pds.peer"; 381 + mockPds.addRecord(DID_A, PEER_NSID, "self", { 382 + $type: PEER_NSID, 383 + peerId: peerIdA, 384 + multiaddrs: addrsA, 385 + createdAt: new Date().toISOString(), 386 + }); 387 + mockPds.addRecord(DID_B, PEER_NSID, "self", { 388 + $type: PEER_NSID, 389 + peerId: peerIdB, 390 + multiaddrs: addrsB, 391 + createdAt: new Date().toISOString(), 392 + }); 393 + 394 + // Connect the two nodes via libp2p 395 + await ipfsA.dial(addrsB[0]!); 396 + await waitFor( 397 + () => ipfsA.getConnectionCount() > 0 && ipfsB.getConnectionCount() > 0, 398 + 10_000, 399 + ); 400 + 401 + // ================================================================ 402 + // Step 4: Cross-sync — addDid discovers peer info, syncs via libp2p 403 + // ================================================================ 404 + // addDid fires background syncDid → discoverPeer finds peer record → 405 + // peer info populated → libp2p path activated. 406 + // Sync sequentially to avoid concurrent stream conflicts on the 407 + // same libp2p connection (yamux muxer doesn't handle concurrent 408 + // dialProtocol from both sides well). 409 + await rmA.addDid(DID_B); 410 + await waitFor(() => { 411 + const state = rmA.getSyncStorage().getState(DID_B); 412 + if (state?.status !== "synced") return false; 413 + // Also wait for sync_history to be completed (not just state table) 414 + const history = rmA.getSyncStorage().getSyncHistory(DID_B, 5); 415 + return history.some((h) => h.sourceType === "libp2p" && h.status === "success"); 416 + }, 30_000); 417 + 418 + await rmB.addDid(DID_A); 419 + await waitFor(() => { 420 + const state = rmB.getSyncStorage().getState(DID_A); 421 + if (state?.status !== "synced") return false; 422 + const history = rmB.getSyncStorage().getSyncHistory(DID_A, 5); 423 + return history.some((h) => h.sourceType === "libp2p" && h.status === "success"); 424 + }, 30_000); 425 + 426 + // Verify peer info was discovered and stored 427 + const stateAforB = rmA.getSyncStorage().getState(DID_B); 428 + expect(stateAforB?.peerId).toBe(peerIdB); 429 + expect(stateAforB?.peerMultiaddrs?.length).toBeGreaterThan(0); 430 + 431 + const stateBforA = rmB.getSyncStorage().getState(DID_A); 432 + expect(stateBforA?.peerId).toBe(peerIdA); 433 + expect(stateBforA?.peerMultiaddrs?.length).toBeGreaterThan(0); 434 + 435 + // Verify sync used libp2p source (the initial sync should have used it 436 + // since peer info was discovered before the fetch step) 437 + const historyA = rmA.getSyncStorage().getSyncHistory(DID_B, 5); 438 + const libp2pSyncA = historyA.find((h) => h.sourceType === "libp2p"); 439 + expect(libp2pSyncA).toBeDefined(); 440 + expect(libp2pSyncA!.status).toBe("success"); 441 + 442 + const historyB = rmB.getSyncStorage().getSyncHistory(DID_A, 5); 443 + const libp2pSyncB = historyB.find((h) => h.sourceType === "libp2p"); 444 + expect(libp2pSyncB).toBeDefined(); 445 + expect(libp2pSyncB!.status).toBe("success"); 446 + 447 + // ================================================================ 448 + // Step 6: Verify cross-serving via XRPC 449 + // ================================================================ 450 + 451 + // Node A serves Bob's repo via getRepo 452 + const getRepoA = await fetch( 453 + `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_B}`, 454 + ); 455 + expect(getRepoA.status).toBe(200); 456 + expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car"); 457 + const carBytesFromA = new Uint8Array(await getRepoA.arrayBuffer()); 458 + expect(carBytesFromA.length).toBeGreaterThan(0); 459 + 460 + // Node B serves Alice's repo via getRepo 461 + const getRepoB = await fetch( 462 + `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_A}`, 463 + ); 464 + expect(getRepoB.status).toBe(200); 465 + const carBytesFromB = new Uint8Array(await getRepoB.arrayBuffer()); 466 + expect(carBytesFromB.length).toBeGreaterThan(0); 467 + 468 + // Node A can read Bob's record 469 + const recordA = await fetch( 470 + `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b1`, 471 + ); 472 + expect(recordA.status).toBe(200); 473 + const recA = (await recordA.json()) as { value: { text: string } }; 474 + expect(recA.value.text).toBe("Bob post 1"); 475 + 476 + // Node B can read Alice's record 477 + const recordB = await fetch( 478 + `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a1`, 479 + ); 480 + expect(recordB.status).toBe(200); 481 + const recB = (await recordB.json()) as { value: { text: string } }; 482 + expect(recB.value.text).toBe("Alice post 1"); 483 + 484 + // Node A can describe Bob's repo 485 + const describeA = await fetch( 486 + `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_B}`, 487 + ); 488 + expect(describeA.status).toBe(200); 489 + const descA = (await describeA.json()) as { did: string; collections: string[] }; 490 + expect(descA.did).toBe(DID_B); 491 + expect(descA.collections).toContain("app.bsky.feed.post"); 492 + 493 + // Node A getRepoStatus for Bob 494 + const repoStatusA = await fetch( 495 + `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_B}`, 496 + ); 497 + expect(repoStatusA.status).toBe(200); 498 + const rsA = (await repoStatusA.json()) as { did: string; rev: string | null }; 499 + expect(rsA.did).toBe(DID_B); 500 + expect(rsA.rev).toBeTruthy(); 501 + 502 + // ================================================================ 503 + // Step 7: Update repos + incremental re-sync via libp2p 504 + // ================================================================ 505 + // Update mock PDS with full CARs (which include additional records) 506 + mockPds.updateAccount(DID_A, aliceFullCar); 507 + mockPds.updateAccount(DID_B, bobFullCar); 508 + 509 + // First, re-sync each node's own DID to update local state 510 + // (so the peer can serve the updated data) 511 + await rmA.syncDid(DID_A, "manual"); 512 + await rmB.syncDid(DID_B, "manual"); 513 + 514 + // Now cross-sync: each node fetches the other's updated data via libp2p 515 + await rmA.syncDid(DID_B, "manual"); 516 + await rmB.syncDid(DID_A, "manual"); 517 + 518 + // Verify incremental sync occurred 519 + const historyA2 = rmA.getSyncStorage().getSyncHistory(DID_B, 10); 520 + const incrementalSyncA = historyA2.find((h) => h.incremental && h.sourceType === "libp2p"); 521 + expect(incrementalSyncA).toBeDefined(); 522 + expect(incrementalSyncA!.status).toBe("success"); 523 + // Verify the incremental sync actually transferred data 524 + expect(incrementalSyncA!.carBytes).toBeGreaterThan(0); 525 + 526 + // Verify the new records are accessible 527 + const newRecordA = await fetch( 528 + `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b2`, 529 + ); 530 + expect(newRecordA.status).toBe(200); 531 + const newRecA = (await newRecordA.json()) as { value: { text: string } }; 532 + expect(newRecA.value.text).toBe("Bob post 2 (update)"); 533 + 534 + const newRecordB = await fetch( 535 + `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a3`, 536 + ); 537 + expect(newRecordB.status).toBe(200); 538 + const newRecB = (await newRecordB.json()) as { value: { text: string } }; 539 + expect(newRecB.value.text).toBe("Alice post 3 (update)"); 540 + 541 + // ================================================================ 542 + // Step 8: Mutual offers -> auto-policies 543 + // ================================================================ 544 + // Inject PolicyEngine and RecordWriter into both ReplicationManagers 545 + const peA = new PolicyEngine({ version: 1, policies: [] }); 546 + const peB = new PolicyEngine({ version: 1, policies: [] }); 547 + (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA; 548 + (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB; 549 + rmA.setPdsClient(createMockRecordWriter(DID_A, mockPds), DID_A); 550 + rmB.setPdsClient(createMockRecordWriter(DID_B, mockPds), DID_B); 551 + 552 + // Discover offers 553 + const offerManagerA = rmA.getOfferManager(); 554 + const offerManagerB = rmB.getOfferManager(); 555 + expect(offerManagerA).toBeDefined(); 556 + expect(offerManagerB).toBeDefined(); 557 + 558 + const statesA = rmA.getSyncStates(); 559 + const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 560 + const agreementsA = await offerManagerA!.discoverAndSync(peersA); 561 + 562 + const statesB = rmB.getSyncStates(); 563 + const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 564 + const agreementsB = await offerManagerB!.discoverAndSync(peersB); 565 + 566 + // Both should detect one mutual agreement 567 + expect(agreementsA.length).toBe(1); 568 + expect(agreementsA[0]!.counterpartyDid).toBe(DID_B); 569 + expect(agreementsB.length).toBe(1); 570 + expect(agreementsB[0]!.counterpartyDid).toBe(DID_A); 571 + 572 + // Verify effective params: max(minCopies), min(intervalSec), max(priority) 573 + expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); 574 + expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); 575 + expect(agreementsA[0]!.effectiveParams.priority).toBe(75); 576 + 577 + // Verify policies were created 578 + const policiesA = peA.getPolicies(); 579 + expect(policiesA.length).toBe(1); 580 + expect(policiesA[0]!.id).toBe(`p2p:${DID_B}`); 581 + expect(policiesA[0]!.replication.minCopies).toBe(3); 582 + 583 + const policiesB = peB.getPolicies(); 584 + expect(policiesB.length).toBe(1); 585 + expect(policiesB[0]!.id).toBe(`p2p:${DID_A}`); 586 + }, 120_000); 587 + });