atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add E2E sync tests with mock PDS, sync diagnostics, and fetch timeouts

- Mock PDS test helper (test-helpers.ts): createTestRepo(), startMockPds(),
createMockDidResolver() for fast integration testing with tiny repos
- E2E sync tests (e2e-sync.test.ts): 7 tests covering syncDid() pipeline
against mock PDS — empty/record/blob accounts, admin API, persistence
- Sync progress logging in syncDid(): CAR size, block count, blob stats,
total duration at each checkpoint
- Fetch timeouts in repo-fetcher.ts: 60s for fetchRepo, 30s for fetchBlob
via AbortController to prevent hanging on slow PDSes

+614 -17
+355
src/replication/e2e-sync.test.ts
··· 1 + /** 2 + * E2E sync integration tests: full syncDid() pipeline against mock PDS accounts. 3 + * 4 + * Uses createTestRepo() + startMockPds() to spin up tiny in-process PDS servers, 5 + * then verifies ReplicationManager.syncDid() stores blocks, updates state, etc. 6 + */ 7 + 8 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 9 + import { mkdtempSync, rmSync } from "node:fs"; 10 + import { tmpdir } from "node:os"; 11 + import { join } from "node:path"; 12 + import Database from "better-sqlite3"; 13 + import { readCarWithRoot } from "@atproto/repo"; 14 + 15 + import { IpfsService, type NetworkService } from "../ipfs.js"; 16 + import { RepoManager } from "../repo-manager.js"; 17 + import type { Config } from "../config.js"; 18 + import { ReplicationManager } from "./replication-manager.js"; 19 + import { 20 + createTestRepo, 21 + startMockPds, 22 + createMockDidResolver, 23 + type MockPds, 24 + } from "./test-helpers.js"; 25 + 26 + const TEST_DID = "did:plc:testuser1"; 27 + const TEST_DID_2 = "did:plc:testuser2"; 28 + 29 + function testConfig(dataDir: string, replicateDids: string[] = []): Config { 30 + return { 31 + DID: "did:plc:localnode", 32 + HANDLE: "local.test", 33 + PDS_HOSTNAME: "local.test", 34 + AUTH_TOKEN: "test-auth-token", 35 + SIGNING_KEY: 36 + "0000000000000000000000000000000000000000000000000000000000000001", 37 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 38 + JWT_SECRET: "test-jwt-secret", 39 + PASSWORD_HASH: "$2a$10$test", 40 + DATA_DIR: dataDir, 41 + PORT: 3000, 42 + IPFS_ENABLED: true, 43 + IPFS_NETWORKING: false, 44 + REPLICATE_DIDS: replicateDids, 45 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 46 + FIREHOSE_ENABLED: false, 47 + RATE_LIMIT_ENABLED: false, 48 + RATE_LIMIT_READ_PER_MIN: 300, 49 + RATE_LIMIT_SYNC_PER_MIN: 30, 50 + RATE_LIMIT_SESSION_PER_MIN: 10, 51 + RATE_LIMIT_WRITE_PER_MIN: 200, 52 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 53 + RATE_LIMIT_MAX_CONNECTIONS: 100, 54 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 55 + OAUTH_ENABLED: false, 56 + }; 57 + } 58 + 59 + const mockNetworkService: NetworkService = { 60 + provideBlocks: async () => {}, 61 + publishCommitNotification: async () => {}, 62 + onCommitNotification: () => {}, 63 + subscribeCommitTopics: () => {}, 64 + unsubscribeCommitTopics: () => {}, 65 + getPeerId: () => null, 66 + getMultiaddrs: () => [], 67 + getConnectionCount: () => 0, 68 + getRemoteAddrs: () => [], 69 + publishIdentityNotification: async () => {}, 70 + onIdentityNotification: () => {}, 71 + subscribeIdentityTopics: () => {}, 72 + unsubscribeIdentityTopics: () => {}, 73 + }; 74 + 75 + describe("E2E sync integration", () => { 76 + let tmpDir: string; 77 + let db: InstanceType<typeof Database>; 78 + let ipfsService: IpfsService; 79 + let repoManager: RepoManager; 80 + let mockPds: MockPds; 81 + let replicationManager: ReplicationManager; 82 + 83 + afterEach(async () => { 84 + replicationManager?.stop(); 85 + await mockPds?.close(); 86 + if (ipfsService?.isRunning()) await ipfsService.stop(); 87 + db?.close(); 88 + if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); 89 + }); 90 + 91 + /** 92 + * Set up a full test environment with mock PDS serving the given accounts. 93 + */ 94 + async function setup(opts: { 95 + dids: string[]; 96 + accounts: Array<{ did: string; carBytes: Uint8Array; blobs?: Map<string, Uint8Array> }>; 97 + }) { 98 + tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-")); 99 + db = new Database(join(tmpDir, "test.db")); 100 + 101 + const config = testConfig(tmpDir, opts.dids); 102 + 103 + repoManager = new RepoManager(db, config); 104 + repoManager.init(); 105 + 106 + ipfsService = new IpfsService({ 107 + blocksPath: join(tmpDir, "blocks"), 108 + datastorePath: join(tmpDir, "datastore"), 109 + networking: false, 110 + }); 111 + await ipfsService.start(); 112 + 113 + mockPds = await startMockPds(opts.accounts); 114 + 115 + const didMapping: Record<string, string> = {}; 116 + for (const did of opts.dids) { 117 + didMapping[did] = mockPds.url; 118 + } 119 + const mockResolver = createMockDidResolver(didMapping); 120 + 121 + replicationManager = new ReplicationManager( 122 + db, 123 + config, 124 + repoManager, 125 + ipfsService, 126 + mockNetworkService, 127 + mockResolver, 128 + ); 129 + await replicationManager.init(); 130 + 131 + return { config, mockResolver }; 132 + } 133 + 134 + it("syncs an empty account (commit block only)", async () => { 135 + const carBytes = await createTestRepo(TEST_DID); 136 + await setup({ 137 + dids: [TEST_DID], 138 + accounts: [{ did: TEST_DID, carBytes }], 139 + }); 140 + 141 + await replicationManager.syncDid(TEST_DID); 142 + 143 + const states = replicationManager.getSyncStates(); 144 + const state = states.find((s) => s.did === TEST_DID); 145 + expect(state).toBeDefined(); 146 + expect(state!.lastSyncRev).toBeTruthy(); 147 + expect(state!.rootCid).toBeTruthy(); 148 + expect(state!.pdsEndpoint).toBe(mockPds.url); 149 + }, 15_000); 150 + 151 + it("syncs an account with records", async () => { 152 + const records = [ 153 + { collection: "app.bsky.feed.post", rkey: "post1", record: { text: "Hello world", createdAt: new Date().toISOString() } }, 154 + { collection: "app.bsky.feed.post", rkey: "post2", record: { text: "Second post", createdAt: new Date().toISOString() } }, 155 + { collection: "app.bsky.actor.profile", rkey: "self", record: { displayName: "Test User" } }, 156 + ]; 157 + const carBytes = await createTestRepo(TEST_DID, records); 158 + await setup({ 159 + dids: [TEST_DID], 160 + accounts: [{ did: TEST_DID, carBytes }], 161 + }); 162 + 163 + await replicationManager.syncDid(TEST_DID); 164 + 165 + const states = replicationManager.getSyncStates(); 166 + const state = states.find((s) => s.did === TEST_DID); 167 + expect(state).toBeDefined(); 168 + expect(state!.lastSyncRev).toBeTruthy(); 169 + 170 + // Verify blocks were stored: parse original CAR and check each block exists 171 + const { blocks } = await readCarWithRoot(carBytes); 172 + const internalMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map; 173 + for (const [cidStr] of internalMap.entries()) { 174 + const hasIt = await ipfsService.hasBlock(cidStr); 175 + expect(hasIt, `block ${cidStr} should be stored`).toBe(true); 176 + } 177 + }, 15_000); 178 + 179 + it("syncs multiple accounts", async () => { 180 + const car1 = await createTestRepo(TEST_DID, [ 181 + { collection: "app.bsky.feed.post", rkey: "a", record: { text: "User 1" } }, 182 + ]); 183 + const car2 = await createTestRepo(TEST_DID_2, [ 184 + { collection: "app.bsky.feed.post", rkey: "b", record: { text: "User 2" } }, 185 + ]); 186 + 187 + await setup({ 188 + dids: [TEST_DID, TEST_DID_2], 189 + accounts: [ 190 + { did: TEST_DID, carBytes: car1 }, 191 + { did: TEST_DID_2, carBytes: car2 }, 192 + ], 193 + }); 194 + 195 + await replicationManager.syncAll(); 196 + 197 + const states = replicationManager.getSyncStates(); 198 + const state1 = states.find((s) => s.did === TEST_DID); 199 + const state2 = states.find((s) => s.did === TEST_DID_2); 200 + expect(state1?.lastSyncRev).toBeTruthy(); 201 + expect(state2?.lastSyncRev).toBeTruthy(); 202 + }, 15_000); 203 + 204 + it("adds a DID via admin API and syncs", async () => { 205 + const carBytes = await createTestRepo(TEST_DID, [ 206 + { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Admin added" } }, 207 + ]); 208 + 209 + // Start with no replicate DIDs — we'll add via admin API 210 + tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-")); 211 + db = new Database(join(tmpDir, "test.db")); 212 + const config = testConfig(tmpDir, []); 213 + repoManager = new RepoManager(db, config); 214 + repoManager.init(); 215 + ipfsService = new IpfsService({ 216 + blocksPath: join(tmpDir, "blocks"), 217 + datastorePath: join(tmpDir, "datastore"), 218 + networking: false, 219 + }); 220 + await ipfsService.start(); 221 + mockPds = await startMockPds([{ did: TEST_DID, carBytes }]); 222 + const mockResolver = createMockDidResolver({ [TEST_DID]: mockPds.url }); 223 + 224 + replicationManager = new ReplicationManager( 225 + db, 226 + config, 227 + repoManager, 228 + ipfsService, 229 + mockNetworkService, 230 + mockResolver, 231 + ); 232 + await replicationManager.init(); 233 + 234 + // Add DID via admin interface 235 + const result = await replicationManager.addDid(TEST_DID); 236 + expect(result.status).toBe("added"); 237 + 238 + // addDid fires syncDid in background — wait a bit then check 239 + await new Promise((r) => setTimeout(r, 2000)); 240 + 241 + const states = replicationManager.getSyncStates(); 242 + const state = states.find((s) => s.did === TEST_DID); 243 + expect(state).toBeDefined(); 244 + expect(state!.pdsEndpoint).toBe(mockPds.url); 245 + // The sync may or may not have completed yet, but the DID should be tracked 246 + expect(replicationManager.getReplicateDids()).toContain(TEST_DID); 247 + }, 15_000); 248 + 249 + it("persists sync state across ReplicationManager restarts", async () => { 250 + const carBytes = await createTestRepo(TEST_DID, [ 251 + { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Persistent" } }, 252 + ]); 253 + await setup({ 254 + dids: [TEST_DID], 255 + accounts: [{ did: TEST_DID, carBytes }], 256 + }); 257 + 258 + await replicationManager.syncDid(TEST_DID); 259 + 260 + const statesBefore = replicationManager.getSyncStates(); 261 + const stateBefore = statesBefore.find((s) => s.did === TEST_DID); 262 + expect(stateBefore?.lastSyncRev).toBeTruthy(); 263 + 264 + // Stop and create a new ReplicationManager on the same DB 265 + replicationManager.stop(); 266 + 267 + const config2 = testConfig(tmpDir, [TEST_DID]); 268 + const mockResolver2 = createMockDidResolver({ [TEST_DID]: mockPds.url }); 269 + const replicationManager2 = new ReplicationManager( 270 + db, 271 + config2, 272 + repoManager, 273 + ipfsService, 274 + mockNetworkService, 275 + mockResolver2, 276 + ); 277 + await replicationManager2.init(); 278 + 279 + const statesAfter = replicationManager2.getSyncStates(); 280 + const stateAfter = statesAfter.find((s) => s.did === TEST_DID); 281 + expect(stateAfter).toBeDefined(); 282 + expect(stateAfter!.lastSyncRev).toBe(stateBefore!.lastSyncRev); 283 + expect(stateAfter!.rootCid).toBe(stateBefore!.rootCid); 284 + 285 + replicationManager2.stop(); 286 + // Replace reference so afterEach doesn't double-stop 287 + replicationManager = replicationManager2; 288 + }, 15_000); 289 + 290 + it("handles sync of account with blobs", async () => { 291 + // Create a blob and a record that references it 292 + const blobBytes = new TextEncoder().encode("fake image data for testing"); 293 + const { create: createCid, CODEC_RAW, toString: cidToString } = await import("@atcute/cid"); 294 + const blobCid = cidToString(await createCid(CODEC_RAW, blobBytes)); 295 + 296 + const carBytes = await createTestRepo(TEST_DID, [ 297 + { 298 + collection: "app.bsky.feed.post", 299 + rkey: "with-blob", 300 + record: { 301 + text: "Post with image", 302 + embed: { 303 + $type: "app.bsky.embed.images", 304 + images: [ 305 + { 306 + alt: "test", 307 + image: { 308 + $type: "blob", 309 + ref: { $link: blobCid }, 310 + mimeType: "image/jpeg", 311 + size: blobBytes.length, 312 + }, 313 + }, 314 + ], 315 + }, 316 + }, 317 + }, 318 + ]); 319 + 320 + const blobs = new Map<string, Uint8Array>(); 321 + blobs.set(blobCid, blobBytes); 322 + 323 + await setup({ 324 + dids: [TEST_DID], 325 + accounts: [{ did: TEST_DID, carBytes, blobs }], 326 + }); 327 + 328 + await replicationManager.syncDid(TEST_DID); 329 + 330 + const states = replicationManager.getSyncStates(); 331 + const state = states.find((s) => s.did === TEST_DID); 332 + expect(state).toBeDefined(); 333 + expect(state!.lastSyncRev).toBeTruthy(); 334 + 335 + // Note: blob sync requires ReplicatedRepoReader which we don't set up here. 336 + // The main syncDid pipeline (block storage) should still complete successfully. 337 + }, 15_000); 338 + 339 + it("sync completes within reasonable time for small repos", async () => { 340 + const carBytes = await createTestRepo(TEST_DID, [ 341 + { collection: "app.bsky.feed.post", rkey: "t1", record: { text: "Timing test" } }, 342 + ]); 343 + await setup({ 344 + dids: [TEST_DID], 345 + accounts: [{ did: TEST_DID, carBytes }], 346 + }); 347 + 348 + const start = Date.now(); 349 + await replicationManager.syncDid(TEST_DID); 350 + const elapsed = Date.now() - start; 351 + 352 + // A tiny mock repo should sync in well under 5 seconds 353 + expect(elapsed).toBeLessThan(5000); 354 + }, 15_000); 355 + });
+11
src/replication/replication-manager.ts
··· 567 567 // 3. Fetch repo (with incremental sync if we have a previous rev) 568 568 const since = state?.lastSyncRev ?? undefined; 569 569 const syncEventId = this.syncStorage.startSyncEvent(did, "pds"); 570 + console.log(`[sync] ${did} — fetching repo from ${pdsEndpoint}${since ? ` (since: ${since.slice(0, 8)}…)` : ""}`); 570 571 let carBytes: Uint8Array; 571 572 try { 572 573 carBytes = await this.repoFetcher.fetchRepo( ··· 599 600 600 601 try { 601 602 // 4. Parse CAR and store blocks 603 + const carSizeKb = (carBytes.length / 1024).toFixed(1); 604 + console.log(`[sync] ${did} — CAR received: ${carSizeKb} KB, parsing...`); 602 605 const { root, blocks } = await readCarWithRoot(carBytes); 603 606 604 607 await this.blockStore.putBlocks(blocks); ··· 615 618 blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); 616 619 } 617 620 } 621 + 622 + console.log(`[sync] ${did} — stored ${blockEntries.length} blocks, verifying...`); 618 623 619 624 // 5b. Track block CIDs with sizes for remote verification 620 625 if (blockEntries.length > 0) { ··· 752 757 } 753 758 754 759 // 11. Sync blobs and capture result 760 + console.log(`[sync] ${did} — syncing blobs...`); 755 761 const blobResult = await this.syncBlobs(did); 756 762 757 763 // 12. Complete sync event with metrics 764 + const totalDuration = ((Date.now() - syncStart) / 1000).toFixed(1); 765 + const totalBlobKb = (blobResult.totalBytes / 1024).toFixed(1); 766 + console.log( 767 + `[sync] ${did} — complete: ${blockEntries.length} blocks, ${blobResult.fetched} blobs (${totalBlobKb} KB), ${totalDuration}s`, 768 + ); 758 769 this.syncStorage.completeSyncEvent(syncEventId, { 759 770 status: "success", 760 771 blocksAdded: blockEntries.length,
+35 -17
src/replication/repo-fetcher.ts
··· 4 4 5 5 import type { DidResolver, DidDocument } from "../did-resolver.js"; 6 6 7 + /** Default timeout for fetchRepo (60 seconds). */ 8 + const REPO_FETCH_TIMEOUT_MS = 60_000; 9 + 10 + /** Default timeout for fetchBlob (30 seconds). */ 11 + const BLOB_FETCH_TIMEOUT_MS = 30_000; 12 + 7 13 export class RepoFetcher { 8 14 constructor(private didResolver: DidResolver) {} 9 15 ··· 20 26 /** 21 27 * Fetch a full repo as CAR bytes from a remote PDS. 22 28 * Uses com.atproto.sync.getRepo XRPC endpoint. 29 + * Aborts after 60s by default to prevent hanging on slow PDSes. 23 30 */ 24 31 async fetchRepo( 25 32 pdsEndpoint: string, ··· 35 42 url.searchParams.set("since", since); 36 43 } 37 44 38 - const res = await fetch(url.toString()); 39 - if (!res.ok) { 40 - throw new Error( 41 - `Failed to fetch repo for ${did}: ${res.status} ${res.statusText}`, 42 - ); 45 + const controller = new AbortController(); 46 + const timeout = setTimeout(() => controller.abort(), REPO_FETCH_TIMEOUT_MS); 47 + try { 48 + const res = await fetch(url.toString(), { signal: controller.signal }); 49 + if (!res.ok) { 50 + throw new Error( 51 + `Failed to fetch repo for ${did}: ${res.status} ${res.statusText}`, 52 + ); 53 + } 54 + return new Uint8Array(await res.arrayBuffer()); 55 + } finally { 56 + clearTimeout(timeout); 43 57 } 44 - 45 - return new Uint8Array(await res.arrayBuffer()); 46 58 } 47 59 48 60 /** ··· 73 85 /** 74 86 * Fetch a blob from a remote PDS. 75 87 * Returns null on 404 (blob deleted upstream), throws on other errors. 88 + * Aborts after 30s by default to prevent hanging on slow PDSes. 76 89 */ 77 90 async fetchBlob( 78 91 pdsEndpoint: string, ··· 86 99 url.searchParams.set("did", did); 87 100 url.searchParams.set("cid", cid); 88 101 89 - const res = await fetch(url.toString()); 90 - if (res.status === 404) { 91 - return null; 92 - } 93 - if (!res.ok) { 94 - throw new Error( 95 - `Failed to fetch blob ${cid} for ${did}: ${res.status} ${res.statusText}`, 96 - ); 102 + const controller = new AbortController(); 103 + const timeout = setTimeout(() => controller.abort(), BLOB_FETCH_TIMEOUT_MS); 104 + try { 105 + const res = await fetch(url.toString(), { signal: controller.signal }); 106 + if (res.status === 404) { 107 + return null; 108 + } 109 + if (!res.ok) { 110 + throw new Error( 111 + `Failed to fetch blob ${cid} for ${did}: ${res.status} ${res.statusText}`, 112 + ); 113 + } 114 + return new Uint8Array(await res.arrayBuffer()); 115 + } finally { 116 + clearTimeout(timeout); 97 117 } 98 - 99 - return new Uint8Array(await res.arrayBuffer()); 100 118 } 101 119 102 120 /**
+213
src/replication/test-helpers.ts
··· 1 + /** 2 + * Mock PDS and test repo builder for integration tests. 3 + * 4 + * Provides: 5 + * - createTestRepo(did, records?) — Build a minimal valid atproto repo as CAR bytes 6 + * - startMockPds(accounts) — HTTP server serving XRPC endpoints for test accounts 7 + * - createMockDidResolver(mapping) — DidResolver that resolves test DIDs to mock PDS URLs 8 + */ 9 + 10 + import { createServer, type Server } from "node:http"; 11 + import { 12 + Repo, 13 + MemoryBlockstore, 14 + blocksToCarFile, 15 + WriteOpAction, 16 + type RecordCreateOp, 17 + } from "@atproto/repo"; 18 + import { Secp256k1Keypair } from "@atproto/crypto"; 19 + import type { DidResolver, DidDocument } from "../did-resolver.js"; 20 + 21 + /** A fixed test signing key (32 bytes hex). */ 22 + const TEST_SIGNING_KEY = 23 + "0000000000000000000000000000000000000000000000000000000000000001"; 24 + 25 + export interface TestAccount { 26 + did: string; 27 + carBytes: Uint8Array; 28 + /** Blob CID → blob bytes, served by mock PDS */ 29 + blobs?: Map<string, Uint8Array>; 30 + } 31 + 32 + /** 33 + * Create a minimal valid atproto repo as CAR bytes. 34 + * Uses @atproto/repo's MemoryBlockstore + Repo.create() with a real MST. 35 + */ 36 + export async function createTestRepo( 37 + did: string, 38 + records?: Array<{ collection: string; rkey: string; record: Record<string, unknown> }>, 39 + ): Promise<Uint8Array> { 40 + const storage = new MemoryBlockstore(); 41 + const keypair = await Secp256k1Keypair.import(TEST_SIGNING_KEY); 42 + 43 + const initialWrites: RecordCreateOp[] = (records ?? []).map((r) => ({ 44 + action: WriteOpAction.Create, 45 + collection: r.collection, 46 + rkey: r.rkey, 47 + record: r.record, 48 + })); 49 + 50 + const repo = await Repo.create(storage, did, keypair, initialWrites); 51 + const carBytes = await blocksToCarFile(repo.cid, storage.blocks); 52 + return carBytes; 53 + } 54 + 55 + /** 56 + * Create a second version of a repo (for incremental sync testing). 57 + * Creates initial repo, applies additional writes, returns both CARs. 58 + */ 59 + export async function createTestRepoWithUpdate( 60 + did: string, 61 + initialRecords: Array<{ collection: string; rkey: string; record: Record<string, unknown> }>, 62 + additionalRecords: Array<{ collection: string; rkey: string; record: Record<string, unknown> }>, 63 + ): Promise<{ initialCar: Uint8Array; updatedCar: Uint8Array; fullCar: Uint8Array }> { 64 + const storage = new MemoryBlockstore(); 65 + const keypair = await Secp256k1Keypair.import(TEST_SIGNING_KEY); 66 + 67 + const initialWrites: RecordCreateOp[] = initialRecords.map((r) => ({ 68 + action: WriteOpAction.Create, 69 + collection: r.collection, 70 + rkey: r.rkey, 71 + record: r.record, 72 + })); 73 + 74 + const repo = await Repo.create(storage, did, keypair, initialWrites); 75 + const initialCar = await blocksToCarFile(repo.cid, storage.blocks); 76 + 77 + const updateWrites: RecordCreateOp[] = additionalRecords.map((r) => ({ 78 + action: WriteOpAction.Create, 79 + collection: r.collection, 80 + rkey: r.rkey, 81 + record: r.record, 82 + })); 83 + 84 + const updatedRepo = await repo.applyWrites(updateWrites, keypair); 85 + const fullCar = await blocksToCarFile(updatedRepo.cid, storage.blocks); 86 + 87 + // For incremental: just the new blocks from the commit 88 + const commitData = await repo.formatCommit(updateWrites, keypair); 89 + const updatedCar = await blocksToCarFile(commitData.cid, commitData.newBlocks); 90 + 91 + return { initialCar, updatedCar, fullCar }; 92 + } 93 + 94 + export interface MockPds { 95 + url: string; 96 + port: number; 97 + close: () => Promise<void>; 98 + /** Update the CAR bytes served for a DID (for incremental sync testing) */ 99 + updateAccount: (did: string, carBytes: Uint8Array) => void; 100 + } 101 + 102 + /** 103 + * Start an HTTP server that serves XRPC endpoints for test accounts. 104 + * Serves: com.atproto.sync.getRepo, com.atproto.sync.getBlob, com.atproto.repo.getRecord 105 + */ 106 + export async function startMockPds( 107 + accounts: TestAccount[], 108 + ): Promise<MockPds> { 109 + const accountMap = new Map<string, TestAccount>(); 110 + for (const account of accounts) { 111 + accountMap.set(account.did, account); 112 + } 113 + 114 + const server = createServer((req, res) => { 115 + const url = new URL(req.url ?? "/", `http://localhost`); 116 + const pathname = url.pathname; 117 + 118 + if (pathname === "/xrpc/com.atproto.sync.getRepo") { 119 + const did = url.searchParams.get("did"); 120 + if (!did || !accountMap.has(did)) { 121 + res.writeHead(404, { "Content-Type": "application/json" }); 122 + res.end(JSON.stringify({ error: "RepoNotFound" })); 123 + return; 124 + } 125 + const account = accountMap.get(did)!; 126 + res.writeHead(200, { 127 + "Content-Type": "application/vnd.ipld.car", 128 + "Content-Length": String(account.carBytes.length), 129 + }); 130 + res.end(Buffer.from(account.carBytes)); 131 + return; 132 + } 133 + 134 + if (pathname === "/xrpc/com.atproto.sync.getBlob") { 135 + const did = url.searchParams.get("did"); 136 + const cid = url.searchParams.get("cid"); 137 + if (!did || !cid || !accountMap.has(did)) { 138 + res.writeHead(404, { "Content-Type": "application/json" }); 139 + res.end(JSON.stringify({ error: "BlobNotFound" })); 140 + return; 141 + } 142 + const account = accountMap.get(did)!; 143 + const blobBytes = account.blobs?.get(cid); 144 + if (!blobBytes) { 145 + res.writeHead(404, { "Content-Type": "application/json" }); 146 + res.end(JSON.stringify({ error: "BlobNotFound" })); 147 + return; 148 + } 149 + res.writeHead(200, { 150 + "Content-Type": "application/octet-stream", 151 + "Content-Length": String(blobBytes.length), 152 + }); 153 + res.end(Buffer.from(blobBytes)); 154 + return; 155 + } 156 + 157 + if (pathname === "/xrpc/com.atproto.repo.listRecords") { 158 + // Return empty records list (sufficient for tests) 159 + res.writeHead(200, { "Content-Type": "application/json" }); 160 + res.end(JSON.stringify({ records: [] })); 161 + return; 162 + } 163 + 164 + res.writeHead(404, { "Content-Type": "application/json" }); 165 + res.end(JSON.stringify({ error: "NotFound" })); 166 + }); 167 + 168 + return new Promise((resolve) => { 169 + server.listen(0, "127.0.0.1", () => { 170 + const addr = server.address() as { port: number }; 171 + const url = `http://127.0.0.1:${addr.port}`; 172 + resolve({ 173 + url, 174 + port: addr.port, 175 + close: () => 176 + new Promise<void>((res) => server.close(() => res())), 177 + updateAccount: (did: string, carBytes: Uint8Array) => { 178 + const existing = accountMap.get(did); 179 + if (existing) { 180 + existing.carBytes = carBytes; 181 + } else { 182 + accountMap.set(did, { did, carBytes }); 183 + } 184 + }, 185 + }); 186 + }); 187 + }); 188 + } 189 + 190 + /** 191 + * Create a mock DidResolver that maps test DIDs to a mock PDS URL. 192 + * Returns DidDocument with #atproto_pds serviceEndpoint. 193 + */ 194 + export function createMockDidResolver( 195 + mapping: Record<string, string>, 196 + ): DidResolver { 197 + return { 198 + resolve: async (did: string): Promise<DidDocument | null> => { 199 + const pdsUrl = mapping[did]; 200 + if (!pdsUrl) return null; 201 + return { 202 + id: did, 203 + service: [ 204 + { 205 + id: "#atproto_pds", 206 + type: "AtprotoPersonalDataServer", 207 + serviceEndpoint: pdsUrl, 208 + }, 209 + ], 210 + } as unknown as DidDocument; 211 + }, 212 + } as DidResolver; 213 + }