atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add incremental sync: serve only changed blocks when since rev is known

When a peer or HTTP client provides a `since` parameter, compute the MST
diff between current and since states, serving only new/changed blocks
instead of the full repo CAR. Falls back to full CAR if the since rev is
unknown or old blocks have been GC'd.

+411 -8
+318
src/replication/incremental-sync.test.ts
··· 1 + /** 2 + * Tests for incremental sync: 3 + * - SyncStorage.getRootCidForRev() 4 + * - generateCarForDid() with since parameter (incremental CAR) 5 + */ 6 + 7 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 8 + import { mkdtempSync, rmSync } from "node:fs"; 9 + import { tmpdir } from "node:os"; 10 + import { join } from "node:path"; 11 + import Database from "better-sqlite3"; 12 + import { IpfsService } from "../ipfs.js"; 13 + import { RepoManager } from "../repo-manager.js"; 14 + import type { Config } from "../config.js"; 15 + import { readCarWithRoot } from "@atproto/repo"; 16 + 17 + import { SyncStorage } from "./sync-storage.js"; 18 + import { generateCarForDid } from "../xrpc/sync.js"; 19 + 20 + function testConfig(dataDir: string): Config { 21 + return { 22 + DID: "did:plc:test123", 23 + HANDLE: "test.example.com", 24 + PDS_HOSTNAME: "test.example.com", 25 + AUTH_TOKEN: "test-auth-token", 26 + SIGNING_KEY: 27 + "0000000000000000000000000000000000000000000000000000000000000001", 28 + SIGNING_KEY_PUBLIC: 29 + "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 30 + JWT_SECRET: "test-jwt-secret", 31 + PASSWORD_HASH: "$2a$10$test", 32 + DATA_DIR: dataDir, 33 + PORT: 3000, 34 + IPFS_ENABLED: true, 35 + IPFS_NETWORKING: false, 36 + REPLICATE_DIDS: [], 37 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 38 + FIREHOSE_ENABLED: false, 39 + RATE_LIMIT_ENABLED: false, 40 + RATE_LIMIT_READ_PER_MIN: 300, 41 + RATE_LIMIT_SYNC_PER_MIN: 30, 42 + RATE_LIMIT_SESSION_PER_MIN: 10, 43 + RATE_LIMIT_WRITE_PER_MIN: 200, 44 + RATE_LIMIT_CHALLENGE_PER_MIN: 20, 45 + RATE_LIMIT_MAX_CONNECTIONS: 100, 46 + RATE_LIMIT_FIREHOSE_PER_IP: 3, 47 + OAUTH_ENABLED: false, 48 + }; 49 + } 50 + 51 + const TEST_DID = "did:plc:testrepo"; 52 + 53 + // ============================================ 54 + // SyncStorage.getRootCidForRev() 55 + // ============================================ 56 + 57 + describe("SyncStorage.getRootCidForRev", () => { 58 + let tmpDir: string; 59 + let db: InstanceType<typeof Database>; 60 + let storage: SyncStorage; 61 + 62 + beforeEach(() => { 63 + tmpDir = mkdtempSync(join(tmpdir(), "incremental-sync-test-")); 64 + db = new Database(join(tmpDir, "test.db")); 65 + storage = new SyncStorage(db); 66 + storage.initSchema(); 67 + }); 68 + 69 + afterEach(() => { 70 + db.close(); 71 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 72 + }); 73 + 74 + it("returns root_cid for a known successful sync", () => { 75 + const eventId = storage.startSyncEvent(TEST_DID, "pds"); 76 + storage.completeSyncEvent(eventId, { 77 + status: "success", 78 + rev: "rev-001", 79 + rootCid: "bafyreiabc123", 80 + }); 81 + 82 + const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 83 + expect(result).toBe("bafyreiabc123"); 84 + }); 85 + 86 + it("returns null for unknown rev", () => { 87 + const eventId = storage.startSyncEvent(TEST_DID, "pds"); 88 + storage.completeSyncEvent(eventId, { 89 + status: "success", 90 + rev: "rev-001", 91 + rootCid: "bafyreiabc123", 92 + }); 93 + 94 + const result = storage.getRootCidForRev(TEST_DID, "rev-unknown"); 95 + expect(result).toBeNull(); 96 + }); 97 + 98 + it("returns null for unknown DID", () => { 99 + const eventId = storage.startSyncEvent(TEST_DID, "pds"); 100 + storage.completeSyncEvent(eventId, { 101 + status: "success", 102 + rev: "rev-001", 103 + rootCid: "bafyreiabc123", 104 + }); 105 + 106 + const result = storage.getRootCidForRev("did:plc:unknown", "rev-001"); 107 + expect(result).toBeNull(); 108 + }); 109 + 110 + it("excludes failed syncs", () => { 111 + const eventId = storage.startSyncEvent(TEST_DID, "pds"); 112 + storage.completeSyncEvent(eventId, { 113 + status: "error", 114 + rev: "rev-001", 115 + rootCid: "bafyreiabc123", 116 + errorMessage: "fetch failed", 117 + }); 118 + 119 + const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 120 + expect(result).toBeNull(); 121 + }); 122 + 123 + it("excludes syncs without root_cid", () => { 124 + const eventId = storage.startSyncEvent(TEST_DID, "pds"); 125 + storage.completeSyncEvent(eventId, { 126 + status: "success", 127 + rev: "rev-001", 128 + }); 129 + 130 + const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 131 + expect(result).toBeNull(); 132 + }); 133 + 134 + it("returns most recent root_cid when multiple syncs exist for same rev", () => { 135 + const event1 = storage.startSyncEvent(TEST_DID, "pds"); 136 + storage.completeSyncEvent(event1, { 137 + status: "success", 138 + rev: "rev-001", 139 + rootCid: "bafyreiold", 140 + }); 141 + 142 + const event2 = storage.startSyncEvent(TEST_DID, "pds"); 143 + storage.completeSyncEvent(event2, { 144 + status: "success", 145 + rev: "rev-001", 146 + rootCid: "bafyreinew", 147 + }); 148 + 149 + const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 150 + expect(result).toBe("bafyreinew"); 151 + }); 152 + }); 153 + 154 + // ============================================ 155 + // Incremental CAR generation 156 + // ============================================ 157 + 158 + describe("Incremental CAR generation", () => { 159 + let tmpDir: string; 160 + let db: InstanceType<typeof Database>; 161 + let ipfsService: IpfsService; 162 + let repoManager: RepoManager; 163 + let syncStorage: SyncStorage; 164 + 165 + beforeEach(async () => { 166 + tmpDir = mkdtempSync(join(tmpdir(), "incremental-car-test-")); 167 + const config = testConfig(tmpDir); 168 + 169 + db = new Database(join(tmpDir, "test.db")); 170 + ipfsService = new IpfsService({ 171 + blocksPath: join(tmpDir, "ipfs-blocks"), 172 + datastorePath: join(tmpDir, "ipfs-datastore"), 173 + networking: false, 174 + }); 175 + await ipfsService.start(); 176 + 177 + repoManager = new RepoManager(db, config); 178 + repoManager.init(undefined, ipfsService, ipfsService); 179 + 180 + syncStorage = new SyncStorage(db); 181 + syncStorage.initSchema(); 182 + }); 183 + 184 + afterEach(async () => { 185 + if (ipfsService.isRunning()) { 186 + await ipfsService.stop(); 187 + } 188 + db.close(); 189 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 190 + }); 191 + 192 + /** 193 + * Helper: snapshot current repo state into IPFS + SyncStorage. 194 + * Returns { rev, rootCid, blockCids }. 195 + */ 196 + async function snapshotRepo(): Promise<{ rev: string; rootCid: string; blockCids: string[] }> { 197 + const status = await repoManager.getRepoStatus(); 198 + const carBytes = await repoManager.getRepoCar(); 199 + const { root, blocks } = await readCarWithRoot(carBytes); 200 + await ipfsService.putBlocks(blocks); 201 + 202 + const rootCid = root.toString(); 203 + const blockCids: string[] = []; 204 + const blockMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map; 205 + for (const [cidStr] of blockMap) { 206 + blockCids.push(cidStr); 207 + } 208 + 209 + // Track in sync storage 210 + syncStorage.upsertState({ did: TEST_DID, pdsEndpoint: "https://pds.example.com" }); 211 + syncStorage.updateSyncProgress(TEST_DID, status.rev, rootCid); 212 + syncStorage.clearBlocks(TEST_DID); 213 + syncStorage.trackBlocks(TEST_DID, blockCids); 214 + 215 + // Record in sync history 216 + const eventId = syncStorage.startSyncEvent(TEST_DID, "test"); 217 + syncStorage.completeSyncEvent(eventId, { 218 + status: "success", 219 + rev: status.rev, 220 + rootCid, 221 + blocksAdded: blockCids.length, 222 + }); 223 + 224 + return { rev: status.rev, rootCid, blockCids }; 225 + } 226 + 227 + it("returns smaller CAR for incremental sync after changes", async () => { 228 + // Create initial records 229 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 230 + $type: "app.bsky.feed.post", 231 + text: "First post", 232 + createdAt: "2025-01-01T00:00:00.000Z", 233 + }); 234 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 235 + $type: "app.bsky.feed.post", 236 + text: "Second post", 237 + createdAt: "2025-01-01T00:00:01.000Z", 238 + }); 239 + 240 + const snapshot1 = await snapshotRepo(); 241 + 242 + // Add more records 243 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 244 + $type: "app.bsky.feed.post", 245 + text: "Third post", 246 + createdAt: "2025-01-01T00:00:02.000Z", 247 + }); 248 + 249 + const snapshot2 = await snapshotRepo(); 250 + 251 + // Full CAR (no since) 252 + const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); 253 + expect(fullCar).not.toBeNull(); 254 + 255 + // Incremental CAR (since first snapshot) 256 + const incrementalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot1.rev); 257 + expect(incrementalCar).not.toBeNull(); 258 + 259 + // Incremental should be smaller than full 260 + expect(incrementalCar!.length).toBeLessThan(fullCar!.length); 261 + }); 262 + 263 + it("falls back to full CAR for unknown since rev", async () => { 264 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 265 + $type: "app.bsky.feed.post", 266 + text: "Hello", 267 + createdAt: "2025-01-01T00:00:00.000Z", 268 + }); 269 + 270 + await snapshotRepo(); 271 + 272 + // Full CAR 273 + const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); 274 + expect(fullCar).not.toBeNull(); 275 + 276 + // Incremental with unknown rev → should fall back to full 277 + const fallbackCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, "unknown-rev"); 278 + expect(fallbackCar).not.toBeNull(); 279 + 280 + // Should be same size as full (fallback) 281 + expect(fallbackCar!.length).toBe(fullCar!.length); 282 + }); 283 + 284 + it("returns minimal CAR when since matches current rev", async () => { 285 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 286 + $type: "app.bsky.feed.post", 287 + text: "Hello", 288 + createdAt: "2025-01-01T00:00:00.000Z", 289 + }); 290 + 291 + const snapshot = await snapshotRepo(); 292 + 293 + // Full CAR 294 + const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); 295 + expect(fullCar).not.toBeNull(); 296 + 297 + // Incremental with since = current rev → minimal CAR (just commit block) 298 + const minimalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot.rev); 299 + expect(minimalCar).not.toBeNull(); 300 + 301 + // Minimal should be much smaller than full 302 + expect(minimalCar!.length).toBeLessThan(fullCar!.length); 303 + 304 + // Parse the minimal CAR to verify it has blocks 305 + const { root } = await readCarWithRoot(minimalCar!); 306 + expect(root.toString()).toBe(snapshot.rootCid); 307 + }); 308 + 309 + it("returns null for DID with no sync state", async () => { 310 + const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage); 311 + expect(result).toBeNull(); 312 + }); 313 + 314 + it("returns null for DID with no sync state even with since", async () => { 315 + const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage, "some-rev"); 316 + expect(result).toBeNull(); 317 + }); 318 + });
+4 -7
src/replication/libp2p-sync.ts
··· 77 77 * 78 78 * When a peer requests a repo via this protocol, the handler: 79 79 * 1. Reads the CBOR-encoded request (DID + optional since) 80 - * 2. Generates a CAR file from the local blockstore 80 + * 2. Generates a CAR file from the local blockstore (incremental if since is provided) 81 81 * 3. Sends status byte + CAR bytes back 82 - * 83 - * Note: `since` is accepted in the request but not yet used for incremental 84 - * CAR generation — the full repo is always served. Incremental support can 85 - * be added later by filtering blocks newer than `since`. 86 82 */ 87 83 export function registerRepoSyncProtocol( 88 84 libp2p: Libp2p, ··· 113 109 `[libp2p-sync] Serving repo for ${request.did}${request.since ? ` (since: ${request.since.slice(0, 8)}…)` : ""}`, 114 110 ); 115 111 116 - // Generate CAR 112 + // Generate CAR (incremental if since is provided) 117 113 const carBytes = await generateCarForDid( 118 114 request.did, 119 115 blockStore, 120 116 syncStorage, 117 + request.since, 121 118 ); 122 119 123 120 if (!carBytes) { ··· 135 132 await stream.close(); 136 133 137 134 console.log( 138 - `[libp2p-sync] Served ${(carBytes.length / 1024).toFixed(1)} KB for ${request.did}`, 135 + `[libp2p-sync] Served ${(carBytes.length / 1024).toFixed(1)} KB ${request.since ? "incremental" : "full"} CAR for ${request.did}`, 139 136 ); 140 137 } catch (err) { 141 138 stream.abort(
+15
src/replication/sync-storage.ts
··· 958 958 return result; 959 959 } 960 960 961 + /** 962 + * Look up the root CID for a specific (did, rev) from sync_history. 963 + * Returns null if no successful sync with that rev is found. 964 + */ 965 + getRootCidForRev(did: string, rev: string): string | null { 966 + const row = this.db 967 + .prepare( 968 + `SELECT root_cid FROM sync_history 969 + WHERE did = ? AND rev = ? AND status = 'success' AND root_cid IS NOT NULL 970 + ORDER BY id DESC LIMIT 1`, 971 + ) 972 + .get(did, rev) as { root_cid: string } | undefined; 973 + return row?.root_cid ?? null; 974 + } 975 + 961 976 private rowToSyncHistory(row: Record<string, unknown>): SyncHistoryRow { 962 977 return { 963 978 id: row.id as number,
+74 -1
src/xrpc/sync.ts
··· 8 8 import type { SyncStorage } from "../replication/sync-storage.js"; 9 9 import { BlockMap, blocksToCarFile } from "@atproto/repo"; 10 10 import { CID } from "@atproto/lex-data"; 11 + import { extractAllCids } from "../replication/mst-proof.js"; 11 12 12 13 /** 13 14 * Generate CAR bytes for a replicated DID from the blockstore. 15 + * When `since` is provided, attempts incremental CAR (only changed blocks). 16 + * Falls back to full CAR if `since` rev is unknown or blocks are unavailable. 14 17 * Returns null if the DID has no sync state or root CID. 15 18 */ 16 19 export async function generateCarForDid( 17 20 did: string, 18 21 blockStore: BlockStore, 19 22 syncStorage: SyncStorage, 23 + since?: string, 20 24 ): Promise<Uint8Array | null> { 21 25 const state = syncStorage.getState(did); 22 26 if (!state?.rootCid) return null; 23 27 28 + // Attempt incremental if since is provided 29 + if (since) { 30 + const incremental = await generateIncrementalCarForDid( 31 + did, 32 + blockStore, 33 + syncStorage, 34 + state.rootCid, 35 + since, 36 + ); 37 + if (incremental) return incremental; 38 + // Fall through to full CAR 39 + } 40 + 24 41 const blockCids = syncStorage.getBlockCids(did); 25 42 const blocks = new BlockMap(); 26 43 for (const cidStr of blockCids) { ··· 33 50 return blocksToCarFile(root, blocks); 34 51 } 35 52 53 + /** 54 + * Generate an incremental CAR containing only blocks that changed since `sinceRev`. 55 + * Returns null if incremental generation is not possible (unknown rev, missing blocks). 56 + */ 57 + async function generateIncrementalCarForDid( 58 + did: string, 59 + blockStore: BlockStore, 60 + syncStorage: SyncStorage, 61 + currentRootCid: string, 62 + sinceRev: string, 63 + ): Promise<Uint8Array | null> { 64 + try { 65 + const sinceRootCid = syncStorage.getRootCidForRev(did, sinceRev); 66 + if (!sinceRootCid) return null; 67 + 68 + // Same root — no changes, return minimal CAR with just the commit block 69 + if (sinceRootCid === currentRootCid) { 70 + const commitBytes = await blockStore.getBlock(currentRootCid); 71 + if (!commitBytes) return null; 72 + const blocks = new BlockMap(); 73 + blocks.set(CID.parse(currentRootCid), commitBytes); 74 + return blocksToCarFile(CID.parse(currentRootCid), blocks); 75 + } 76 + 77 + // Walk both MSTs to get full CID sets 78 + const [currentCids, sinceCids] = await Promise.all([ 79 + extractAllCids(blockStore, currentRootCid), 80 + extractAllCids(blockStore, sinceRootCid), 81 + ]); 82 + 83 + // Diff: CIDs in current but not in since 84 + const diffCids: string[] = []; 85 + for (const cid of currentCids) { 86 + if (!sinceCids.has(cid)) { 87 + diffCids.push(cid); 88 + } 89 + } 90 + 91 + // Build CAR from diff blocks 92 + const blocks = new BlockMap(); 93 + for (const cidStr of diffCids) { 94 + const bytes = await blockStore.getBlock(cidStr); 95 + if (bytes) { 96 + blocks.set(CID.parse(cidStr), bytes); 97 + } 98 + } 99 + 100 + const root = CID.parse(currentRootCid); 101 + return blocksToCarFile(root, blocks); 102 + } catch { 103 + // Any error (GC'd blocks, corrupt data) → fall back to full CAR 104 + return null; 105 + } 106 + } 107 + 36 108 export async function getRepo( 37 109 c: Context<AppEnv>, 38 110 repoManager: RepoManager | undefined, ··· 69 141 70 142 // Replicated DID: serve from BlockStore 71 143 if (blockStore && syncStorage) { 72 - const carBytes = await generateCarForDid(did, blockStore, syncStorage); 144 + const since = c.req.query("since"); 145 + const carBytes = await generateCarForDid(did, blockStore, syncStorage, since); 73 146 if (carBytes) { 74 147 return new Response(carBytes, { 75 148 status: 200,