atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add bidirectional replication E2E tests with mutual offer discovery

Two tests validating the full bidirectional loop:
1. Two nodes sync each other's data and serve it via all sync/repo endpoints
(getRepo, getRepoStatus, listRepos, getRecord, listRecords, describeRepo)
2. Mutual offers create P2P replication policies with correct parameter merging
(max minCopies, min intervalSec, max priority)

Uses enhanced mock PDS with configurable records per DID/collection.

+518
+518
src/bidirectional-replication.test.ts
··· 1 + /** 2 + * Bidirectional replication E2E test. 3 + * 4 + * Two p2pds nodes, each with a simulated user account on a mock PDS. 5 + * Each node tracks the other's DID, syncs data, publishes offers, 6 + * discovers mutual agreements, and serves replicated data via sync endpoints. 7 + */ 8 + 9 + import { describe, it, expect, afterEach } from "vitest"; 10 + import { mkdtempSync, rmSync } from "node:fs"; 11 + import { tmpdir } from "node:os"; 12 + import { join, resolve } from "node:path"; 13 + import Database from "better-sqlite3"; 14 + 15 + import type { Config } from "./config.js"; 16 + import { startServer, type ServerHandle } from "./start.js"; 17 + import { 18 + createTestRepo, 19 + startMockPds, 20 + type MockPds, 21 + } from "./replication/test-helpers.js"; 22 + import type { DidResolver, DidDocument } from "./did-resolver.js"; 23 + import { PolicyEngine } from "./policy/engine.js"; 24 + import { OFFER_NSID, didToRkey } from "./replication/types.js"; 25 + 26 + /** DID for Node A's user identity. */ 27 + const DID_NODE_A = "did:plc:node-a-user"; 28 + /** DID for Node B's user identity. */ 29 + const DID_NODE_B = "did:plc:node-b-user"; 30 + 31 + function baseConfig(dataDir: string): Config { 32 + return { 33 + PDS_HOSTNAME: "local.test", 34 + AUTH_TOKEN: "test-auth-token", 35 + JWT_SECRET: "test-jwt-secret", 36 + PASSWORD_HASH: "$2a$10$test", 37 + DATA_DIR: dataDir, 38 + PORT: 0, 39 + IPFS_ENABLED: true, 40 + IPFS_NETWORKING: false, 41 + REPLICATE_DIDS: [], 42 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 43 + FIREHOSE_ENABLED: false, 44 + RATE_LIMIT_ENABLED: false, 45 + RATE_LIMIT_READ_PER_MIN: 300, 46 + RATE_LIMIT_SYNC_PER_MIN: 30, 47 + RATE_LIMIT_SESSION_PER_MIN: 10, 48 + RATE_LIMIT_WRITE_PER_MIN: 200, 49 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 50 + RATE_LIMIT_MAX_CONNECTIONS: 100, 51 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 52 + OAUTH_ENABLED: false, 53 + }; 54 + } 55 + 56 + /** 57 + * Enhanced mock PDS that serves configurable records per DID per collection. 58 + * Supports adding offer records dynamically to simulate users publishing offers. 59 + */ 60 + interface EnhancedMockPds extends MockPds { 61 + addRecord(did: string, collection: string, rkey: string, value: unknown): void; 62 + } 63 + 64 + async function startEnhancedMockPds( 65 + accounts: Array<{ did: string; carBytes: Uint8Array }>, 66 + ): Promise<EnhancedMockPds> { 67 + const { createServer } = await import("node:http"); 68 + 69 + const accountMap = new Map<string, Uint8Array>(); 70 + for (const a of accounts) { 71 + accountMap.set(a.did, a.carBytes); 72 + } 73 + 74 + // Records store: did -> collection -> rkey -> { uri, cid, value } 75 + const records = new Map<string, Map<string, Map<string, { uri: string; cid: string; value: unknown }>>>(); 76 + 77 + const server = createServer((req, res) => { 78 + const url = new URL(req.url ?? "/", "http://localhost"); 79 + const pathname = url.pathname; 80 + 81 + if (pathname === "/xrpc/com.atproto.sync.getRepo") { 82 + const did = url.searchParams.get("did"); 83 + if (!did || !accountMap.has(did)) { 84 + res.writeHead(404, { "Content-Type": "application/json" }); 85 + res.end(JSON.stringify({ error: "RepoNotFound" })); 86 + return; 87 + } 88 + const carBytes = accountMap.get(did)!; 89 + res.writeHead(200, { 90 + "Content-Type": "application/vnd.ipld.car", 91 + "Content-Length": String(carBytes.length), 92 + }); 93 + res.end(Buffer.from(carBytes)); 94 + return; 95 + } 96 + 97 + if (pathname === "/xrpc/com.atproto.repo.listRecords") { 98 + const did = url.searchParams.get("repo"); 99 + const collection = url.searchParams.get("collection"); 100 + if (!did || !collection) { 101 + res.writeHead(400, { "Content-Type": "application/json" }); 102 + res.end(JSON.stringify({ error: "InvalidRequest" })); 103 + return; 104 + } 105 + const didRecords = records.get(did)?.get(collection); 106 + const recordList = didRecords ? Array.from(didRecords.values()) : []; 107 + res.writeHead(200, { "Content-Type": "application/json" }); 108 + res.end(JSON.stringify({ records: recordList })); 109 + return; 110 + } 111 + 112 + if (pathname === "/xrpc/com.atproto.repo.getRecord") { 113 + const did = url.searchParams.get("repo"); 114 + const collection = url.searchParams.get("collection"); 115 + const rkey = url.searchParams.get("rkey"); 116 + if (!did || !collection || !rkey) { 117 + res.writeHead(400, { "Content-Type": "application/json" }); 118 + res.end(JSON.stringify({ error: "InvalidRequest" })); 119 + return; 120 + } 121 + const record = records.get(did)?.get(collection)?.get(rkey); 122 + if (!record) { 123 + res.writeHead(404, { "Content-Type": "application/json" }); 124 + res.end(JSON.stringify({ error: "RecordNotFound" })); 125 + return; 126 + } 127 + res.writeHead(200, { "Content-Type": "application/json" }); 128 + res.end(JSON.stringify(record)); 129 + return; 130 + } 131 + 132 + res.writeHead(404, { "Content-Type": "application/json" }); 133 + res.end(JSON.stringify({ error: "NotFound" })); 134 + }); 135 + 136 + return new Promise((resolve) => { 137 + server.listen(0, "127.0.0.1", () => { 138 + const addr = server.address() as { port: number }; 139 + const pdsUrl = `http://127.0.0.1:${addr.port}`; 140 + resolve({ 141 + url: pdsUrl, 142 + port: addr.port, 143 + close: () => new Promise<void>((res) => server.close(() => res())), 144 + updateAccount: (did: string, carBytes: Uint8Array) => { 145 + accountMap.set(did, carBytes); 146 + }, 147 + addRecord: (did: string, collection: string, rkey: string, value: unknown) => { 148 + if (!records.has(did)) records.set(did, new Map()); 149 + const didMap = records.get(did)!; 150 + if (!didMap.has(collection)) didMap.set(collection, new Map()); 151 + didMap.get(collection)!.set(rkey, { 152 + uri: `at://${did}/${collection}/${rkey}`, 153 + cid: "bafytest", 154 + value, 155 + }); 156 + }, 157 + }); 158 + }); 159 + }); 160 + } 161 + 162 + function createMockDidResolver(mapping: Record<string, string>): DidResolver { 163 + return { 164 + resolve: async (did: string): Promise<DidDocument | null> => { 165 + const pdsUrl = mapping[did]; 166 + if (!pdsUrl) return null; 167 + return { 168 + id: did, 169 + service: [ 170 + { 171 + id: "#atproto_pds", 172 + type: "AtprotoPersonalDataServer", 173 + serviceEndpoint: pdsUrl, 174 + }, 175 + ], 176 + } as unknown as DidDocument; 177 + }, 178 + } as DidResolver; 179 + } 180 + 181 + describe("Bidirectional replication E2E", () => { 182 + let tmpDirA: string; 183 + let tmpDirB: string; 184 + let handleA: ServerHandle | undefined; 185 + let handleB: ServerHandle | undefined; 186 + let mockPds: EnhancedMockPds | undefined; 187 + 188 + afterEach(async () => { 189 + if (handleA) { await handleA.close(); handleA = undefined; } 190 + if (handleB) { await handleB.close(); handleB = undefined; } 191 + if (mockPds) { await mockPds.close(); mockPds = undefined; } 192 + if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); 193 + if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); 194 + }); 195 + 196 + it("two nodes sync each other's data and serve it via sync endpoints", async () => { 197 + tmpDirA = mkdtempSync(join(tmpdir(), "bidir-a-")); 198 + tmpDirB = mkdtempSync(join(tmpdir(), "bidir-b-")); 199 + 200 + // Create test repos for each user's account 201 + const aliceCar = await createTestRepo(DID_NODE_A, [ 202 + { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post", createdAt: new Date().toISOString() } }, 203 + { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, 204 + ]); 205 + const bobCar = await createTestRepo(DID_NODE_B, [ 206 + { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post", createdAt: new Date().toISOString() } }, 207 + ]); 208 + 209 + // Mock PDS serves both accounts 210 + mockPds = await startEnhancedMockPds([ 211 + { did: DID_NODE_A, carBytes: aliceCar }, 212 + { did: DID_NODE_B, carBytes: bobCar }, 213 + ]); 214 + const resolver = createMockDidResolver({ 215 + [DID_NODE_A]: mockPds.url, 216 + [DID_NODE_B]: mockPds.url, 217 + }); 218 + 219 + // Start two servers — each with their own identity 220 + const configA = baseConfig(tmpDirA); 221 + const configB = baseConfig(tmpDirB); 222 + 223 + // Pre-set identities (simulating OAuth already done) 224 + const dbA = new Database(resolve(tmpDirA, "pds.db")); 225 + dbA.pragma("journal_mode = WAL"); 226 + dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 227 + dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test"); 228 + dbA.close(); 229 + configA.DID = DID_NODE_A; 230 + configA.HANDLE = "alice.test"; 231 + 232 + const dbB = new Database(resolve(tmpDirB, "pds.db")); 233 + dbB.pragma("journal_mode = WAL"); 234 + dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 235 + dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test"); 236 + dbB.close(); 237 + configB.DID = DID_NODE_B; 238 + configB.HANDLE = "bob.test"; 239 + 240 + handleA = await startServer(configA, { didResolver: resolver }); 241 + handleB = await startServer(configB, { didResolver: resolver }); 242 + 243 + // Both nodes should have replication managers 244 + expect(handleA.replicationManager).toBeDefined(); 245 + expect(handleB.replicationManager).toBeDefined(); 246 + 247 + // Node A adds Node B's DID, Node B adds Node A's DID 248 + const addBToA = await fetch(`${handleA.url}/xrpc/org.p2pds.admin.addDid`, { 249 + method: "POST", 250 + headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}`, "Content-Type": "application/json" }, 251 + body: JSON.stringify({ did: DID_NODE_B }), 252 + }); 253 + expect(addBToA.status).toBe(200); 254 + 255 + const addAToB = await fetch(`${handleB.url}/xrpc/org.p2pds.admin.addDid`, { 256 + method: "POST", 257 + headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}`, "Content-Type": "application/json" }, 258 + body: JSON.stringify({ did: DID_NODE_A }), 259 + }); 260 + expect(addAToB.status).toBe(200); 261 + 262 + // Trigger sync on both 263 + await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { 264 + method: "POST", 265 + headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, 266 + }); 267 + await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, { 268 + method: "POST", 269 + headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` }, 270 + }); 271 + 272 + // Wait for async sync 273 + await new Promise((r) => setTimeout(r, 3000)); 274 + 275 + // Verify Node A synced Bob's data 276 + const statusA = await fetch( 277 + `${handleA.url}/xrpc/org.p2pds.admin.getDidStatus?did=${DID_NODE_B}`, 278 + { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } }, 279 + ); 280 + const dsA = (await statusA.json()) as { did: string; blockCount: number; syncState: { status: string } }; 281 + expect(dsA.did).toBe(DID_NODE_B); 282 + expect(dsA.blockCount).toBeGreaterThan(0); 283 + expect(dsA.syncState.status).toBe("synced"); 284 + 285 + // Verify Node B synced Alice's data 286 + const statusB = await fetch( 287 + `${handleB.url}/xrpc/org.p2pds.admin.getDidStatus?did=${DID_NODE_A}`, 288 + { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } }, 289 + ); 290 + const dsB = (await statusB.json()) as { did: string; blockCount: number; syncState: { status: string } }; 291 + expect(dsB.did).toBe(DID_NODE_A); 292 + expect(dsB.blockCount).toBeGreaterThan(0); 293 + expect(dsB.syncState.status).toBe("synced"); 294 + 295 + // ---- Verify sync endpoints serve replicated data ---- 296 + 297 + // Node A serves Bob's repo via getRepo 298 + const getRepoA = await fetch( 299 + `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_B}`, 300 + ); 301 + expect(getRepoA.status).toBe(200); 302 + expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car"); 303 + const carBytesA = new Uint8Array(await getRepoA.arrayBuffer()); 304 + expect(carBytesA.length).toBeGreaterThan(0); 305 + 306 + // Node B serves Alice's repo via getRepo 307 + const getRepoB = await fetch( 308 + `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_A}`, 309 + ); 310 + expect(getRepoB.status).toBe(200); 311 + const carBytesB = new Uint8Array(await getRepoB.arrayBuffer()); 312 + expect(carBytesB.length).toBeGreaterThan(0); 313 + 314 + // Node A serves Bob's repo via getRepoStatus 315 + const repoStatusA = await fetch( 316 + `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_NODE_B}`, 317 + ); 318 + expect(repoStatusA.status).toBe(200); 319 + const rsA = (await repoStatusA.json()) as { did: string; active: boolean; rev: string | null }; 320 + expect(rsA.did).toBe(DID_NODE_B); 321 + expect(rsA.rev).toBeTruthy(); 322 + 323 + // Node B lists all repos (should include Alice's) 324 + const listReposB = await fetch( 325 + `${handleB.url}/xrpc/com.atproto.sync.listRepos`, 326 + ); 327 + expect(listReposB.status).toBe(200); 328 + const reposB = (await listReposB.json()) as { repos: Array<{ did: string }> }; 329 + const replicatedDids = reposB.repos.map((r) => r.did); 330 + expect(replicatedDids).toContain(DID_NODE_A); 331 + 332 + // Node A can read Bob's records via repo.getRecord 333 + const recordA = await fetch( 334 + `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_B}&collection=app.bsky.feed.post&rkey=b1`, 335 + ); 336 + expect(recordA.status).toBe(200); 337 + const recA = (await recordA.json()) as { uri: string; value: { text: string } }; 338 + expect(recA.value.text).toBe("Bob post"); 339 + 340 + // Node B can read Alice's records via repo.getRecord 341 + const recordB = await fetch( 342 + `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_A}&collection=app.bsky.feed.post&rkey=a1`, 343 + ); 344 + expect(recordB.status).toBe(200); 345 + const recB = (await recordB.json()) as { uri: string; value: { text: string } }; 346 + expect(recB.value.text).toBe("Alice post"); 347 + 348 + // Node B can list Alice's records 349 + const listRecsB = await fetch( 350 + `${handleB.url}/xrpc/com.atproto.repo.listRecords?repo=${DID_NODE_A}&collection=app.bsky.feed.post`, 351 + ); 352 + expect(listRecsB.status).toBe(200); 353 + const recsB = (await listRecsB.json()) as { records: Array<{ value: { text: string } }> }; 354 + expect(recsB.records.length).toBe(2); 355 + 356 + // Node A can describe Bob's repo 357 + const describeA = await fetch( 358 + `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_NODE_B}`, 359 + ); 360 + expect(describeA.status).toBe(200); 361 + const descA = (await describeA.json()) as { did: string; collections: string[] }; 362 + expect(descA.did).toBe(DID_NODE_B); 363 + expect(descA.collections).toContain("app.bsky.feed.post"); 364 + }, 30_000); 365 + 366 + it("mutual offers create P2P replication policies", async () => { 367 + tmpDirA = mkdtempSync(join(tmpdir(), "bidir-offer-a-")); 368 + tmpDirB = mkdtempSync(join(tmpdir(), "bidir-offer-b-")); 369 + 370 + // Create minimal repos 371 + const aliceCar = await createTestRepo(DID_NODE_A, [ 372 + { collection: "app.bsky.feed.post", rkey: "x1", record: { text: "test", createdAt: new Date().toISOString() } }, 373 + ]); 374 + const bobCar = await createTestRepo(DID_NODE_B, [ 375 + { collection: "app.bsky.feed.post", rkey: "y1", record: { text: "test", createdAt: new Date().toISOString() } }, 376 + ]); 377 + 378 + // Mock PDS with offer records 379 + mockPds = await startEnhancedMockPds([ 380 + { did: DID_NODE_A, carBytes: aliceCar }, 381 + { did: DID_NODE_B, carBytes: bobCar }, 382 + ]); 383 + 384 + // Simulate both users having published offers for each other 385 + // Node A's user published an offer for Node B's DID 386 + mockPds.addRecord(DID_NODE_A, OFFER_NSID, didToRkey(DID_NODE_B), { 387 + $type: OFFER_NSID, 388 + subject: DID_NODE_B, 389 + minCopies: 2, 390 + intervalSec: 300, 391 + priority: 50, 392 + createdAt: new Date().toISOString(), 393 + }); 394 + // Node B's user published an offer for Node A's DID 395 + mockPds.addRecord(DID_NODE_B, OFFER_NSID, didToRkey(DID_NODE_A), { 396 + $type: OFFER_NSID, 397 + subject: DID_NODE_A, 398 + minCopies: 3, 399 + intervalSec: 600, 400 + priority: 75, 401 + createdAt: new Date().toISOString(), 402 + }); 403 + 404 + const resolver = createMockDidResolver({ 405 + [DID_NODE_A]: mockPds.url, 406 + [DID_NODE_B]: mockPds.url, 407 + }); 408 + 409 + // Create configs with policy engines and identities pre-set 410 + const configA = baseConfig(tmpDirA); 411 + configA.DID = DID_NODE_A; 412 + configA.HANDLE = "alice.test"; 413 + const dbA = new Database(resolve(tmpDirA, "pds.db")); 414 + dbA.pragma("journal_mode = WAL"); 415 + dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 416 + dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test"); 417 + dbA.close(); 418 + 419 + const configB = baseConfig(tmpDirB); 420 + configB.DID = DID_NODE_B; 421 + configB.HANDLE = "bob.test"; 422 + const dbB = new Database(resolve(tmpDirB, "pds.db")); 423 + dbB.pragma("journal_mode = WAL"); 424 + dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 425 + dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test"); 426 + dbB.close(); 427 + 428 + // We need to inject PolicyEngine + a mock RecordWriter into the ReplicationManager 429 + // The simplest approach: start the servers, then use the OfferManager directly 430 + handleA = await startServer(configA, { didResolver: resolver }); 431 + handleB = await startServer(configB, { didResolver: resolver }); 432 + 433 + const rmA = handleA.replicationManager!; 434 + const rmB = handleB.replicationManager!; 435 + 436 + // Create mock RecordWriters that use the mock PDS records store 437 + // (In production this would be the PdsClient via OAuth) 438 + const mockWriterA = createMockRecordWriter(DID_NODE_A, mockPds); 439 + const mockWriterB = createMockRecordWriter(DID_NODE_B, mockPds); 440 + 441 + // Inject policy engine and setPdsClient 442 + const peA = new PolicyEngine({ version: 1, policies: [] }); 443 + const peB = new PolicyEngine({ version: 1, policies: [] }); 444 + // Access private field to set policy engine — test-only hack 445 + (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA; 446 + (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB; 447 + rmA.setPdsClient(mockWriterA, DID_NODE_A); 448 + rmB.setPdsClient(mockWriterB, DID_NODE_B); 449 + 450 + // Both nodes add each other's DID 451 + await rmA.addDid(DID_NODE_B); 452 + await rmB.addDid(DID_NODE_A); 453 + 454 + // Wait for initial sync 455 + await new Promise((r) => setTimeout(r, 3000)); 456 + 457 + // Now run offer discovery on both 458 + const offerManagerA = rmA.getOfferManager(); 459 + const offerManagerB = rmB.getOfferManager(); 460 + expect(offerManagerA).toBeDefined(); 461 + expect(offerManagerB).toBeDefined(); 462 + 463 + // Node A discovers agreements: it should find that Node B has an offer for Node A 464 + const statesA = rmA.getSyncStates(); 465 + const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 466 + const agreementsA = await offerManagerA!.discoverAndSync(peersA); 467 + 468 + // Node B discovers agreements similarly 469 + const statesB = rmB.getSyncStates(); 470 + const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 471 + const agreementsB = await offerManagerB!.discoverAndSync(peersB); 472 + 473 + // Both should detect one mutual agreement 474 + expect(agreementsA.length).toBe(1); 475 + expect(agreementsA[0]!.counterpartyDid).toBe(DID_NODE_B); 476 + expect(agreementsB.length).toBe(1); 477 + expect(agreementsB[0]!.counterpartyDid).toBe(DID_NODE_A); 478 + 479 + // Verify effective params: max(minCopies), min(intervalSec), max(priority) 480 + expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); // max(2, 3) 481 + expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); // min(300, 600) 482 + expect(agreementsA[0]!.effectiveParams.priority).toBe(75); // max(50, 75) 483 + 484 + // Verify policies were created in the policy engine 485 + const policiesA = peA.getPolicies(); 486 + expect(policiesA.length).toBe(1); 487 + expect(policiesA[0]!.id).toBe(`p2p:${DID_NODE_B}`); 488 + expect(policiesA[0]!.replication.minCopies).toBe(3); 489 + 490 + const policiesB = peB.getPolicies(); 491 + expect(policiesB.length).toBe(1); 492 + expect(policiesB[0]!.id).toBe(`p2p:${DID_NODE_A}`); 493 + }, 30_000); 494 + }); 495 + 496 + /** 497 + * Create a mock RecordWriter backed by the enhanced mock PDS. 498 + * This simulates what PdsClient does: read/write records to the user's PDS. 499 + */ 500 + function createMockRecordWriter(did: string, pds: EnhancedMockPds) { 501 + return { 502 + putRecord: async (collection: string, rkey: string, record: unknown) => { 503 + pds.addRecord(did, collection, rkey, record); 504 + return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" }; 505 + }, 506 + deleteRecord: async (_collection: string, _rkey: string) => { 507 + // No-op for test 508 + }, 509 + listRecords: async (collection: string, _opts: { limit: number }) => { 510 + // Fetch from mock PDS 511 + const res = await fetch( 512 + `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`, 513 + ); 514 + if (!res.ok) return { records: [] }; 515 + return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> }; 516 + }, 517 + }; 518 + }