atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add layered remote verification for peer block availability

Implements content-addressed verification via RASL endpoints to prove
remote peers actually host the blocks they claim to serve. Tracks
replicated block CIDs per-DID and runs verification on a separate timer.

+868 -8
+96 -5
src/replication/replication-manager.ts
··· 17 17 type PeerIdentityRecord, 18 18 type ManifestRecord, 19 19 type SyncState, 20 + type VerificationConfig, 21 + type LayeredVerificationResult, 22 + DEFAULT_VERIFICATION_CONFIG, 20 23 } from "./types.js"; 21 24 import { SyncStorage } from "./sync-storage.js"; 22 25 import { RepoFetcher } from "./repo-fetcher.js"; 23 26 import { PeerDiscovery } from "./peer-discovery.js"; 24 - import { BlockVerifier } from "./verification.js"; 27 + import { BlockVerifier, RemoteVerifier } from "./verification.js"; 25 28 26 29 /** How old cached peer info can be before re-fetching (1 hour). */ 27 30 const PEER_INFO_TTL_MS = 60 * 60 * 1000; ··· 31 34 private repoFetcher: RepoFetcher; 32 35 private peerDiscovery: PeerDiscovery; 33 36 private verifier: BlockVerifier; 37 + private remoteVerifier: RemoteVerifier; 34 38 private syncTimer: ReturnType<typeof setInterval> | null = null; 39 + private verificationTimer: ReturnType<typeof setInterval> | null = null; 40 + private verificationConfig: VerificationConfig; 41 + private lastVerificationResults: Map<string, LayeredVerificationResult> = 42 + new Map(); 35 43 private stopped = false; 36 44 37 45 constructor( ··· 40 48 private repoManager: RepoManager, 41 49 private ipfsService: IpfsService, 42 50 private didResolver: DidResolver, 51 + verificationConfig?: Partial<VerificationConfig>, 43 52 ) { 44 53 this.syncStorage = new SyncStorage(db); 45 54 this.repoFetcher = new RepoFetcher(didResolver); 46 55 this.peerDiscovery = new PeerDiscovery(this.repoFetcher); 47 56 this.verifier = new BlockVerifier(ipfsService); 57 + this.verificationConfig = { 58 + ...DEFAULT_VERIFICATION_CONFIG, 59 + ...verificationConfig, 60 + }; 61 + this.remoteVerifier = new RemoteVerifier( 62 + ipfsService, 63 + this.verificationConfig, 64 + ); 48 65 } 49 66 50 67 /** ··· 195 212 } 196 213 } 197 214 215 + // 5b. Track block CIDs for remote verification 216 + if (cidStrs.length > 0) { 217 + this.syncStorage.trackBlocks(did, cidStrs); 218 + } 219 + 198 220 // 6. Announce to DHT (fire-and-forget) 199 221 this.ipfsService.provideBlocks(cidStrs).catch(() => {}); 200 222 201 - // 7. Verify block availability 223 + // 7. Verify local block availability 202 224 const verification = 203 225 await this.verifier.verifyBlockAvailability(cidStrs); 204 226 if (verification.missing.length > 0) { 205 227 console.warn( 206 - `Verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, 228 + `Local verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, 207 229 ); 208 230 } 209 231 this.syncStorage.updateVerifiedAt(did); ··· 237 259 } 238 260 239 261 /** 240 - * Start periodic sync at the given interval. 262 + * Start periodic sync and verification at their respective intervals. 241 263 */ 242 264 startPeriodicSync(intervalMs: number = 5 * 60 * 1000): void { 243 265 if (this.syncTimer) return; ··· 259 281 }); 260 282 } 261 283 }, intervalMs); 284 + 285 + // Start verification on a separate timer 286 + this.verificationTimer = setInterval(() => { 287 + if (!this.stopped) { 288 + this.runVerification().catch((err) => { 289 + console.error("Periodic verification error:", err); 290 + }); 291 + } 292 + }, this.verificationConfig.verificationIntervalMs); 262 293 } 263 294 264 295 /** 265 - * Stop periodic sync. 296 + * Stop periodic sync and verification. 266 297 */ 267 298 stop(): void { 268 299 this.stopped = true; ··· 270 301 clearInterval(this.syncTimer); 271 302 this.syncTimer = null; 272 303 } 304 + if (this.verificationTimer) { 305 + clearInterval(this.verificationTimer); 306 + this.verificationTimer = null; 307 + } 308 + } 309 + 310 + /** 311 + * Run remote verification for all synced DIDs. 312 + */ 313 + async runVerification(): Promise<LayeredVerificationResult[]> { 314 + const results: LayeredVerificationResult[] = []; 315 + 316 + for (const did of this.config.REPLICATE_DIDS) { 317 + if (this.stopped) break; 318 + try { 319 + const result = await this.verifyDid(did); 320 + if (result) { 321 + results.push(result); 322 + this.lastVerificationResults.set(did, result); 323 + } 324 + } catch (err) { 325 + console.error(`Verification error for ${did}:`, err); 326 + } 327 + } 328 + 329 + return results; 330 + } 331 + 332 + /** 333 + * Run layered verification for a single DID against its source PDS. 334 + */ 335 + async verifyDid(did: string): Promise<LayeredVerificationResult | null> { 336 + const state = this.syncStorage.getState(did); 337 + if (!state || !state.pdsEndpoint) return null; 338 + 339 + // Get tracked block CIDs for this DID 340 + const blockCids = this.syncStorage.getBlockCids(did); 341 + 342 + // Use the last sync rev as the root CID 343 + const rootCid = state.lastSyncRev ?? null; 344 + 345 + const result = await this.remoteVerifier.verifyPeer( 346 + did, 347 + state.pdsEndpoint, 348 + rootCid, 349 + blockCids, 350 + ); 351 + 352 + if (result.overallPassed) { 353 + this.syncStorage.updateVerifiedAt(did); 354 + } 355 + 356 + return result; 273 357 } 274 358 275 359 /** ··· 277 361 */ 278 362 getSyncStates(): SyncState[] { 279 363 return this.syncStorage.getAllStates(); 364 + } 365 + 366 + /** 367 + * Get the most recent verification results. 368 + */ 369 + getVerificationResults(): Map<string, LayeredVerificationResult> { 370 + return this.lastVerificationResults; 280 371 } 281 372 282 373 /**
+485 -1
src/replication/replication.test.ts
··· 20 20 rkeyToDid, 21 21 PEER_NSID, 22 22 MANIFEST_NSID, 23 + DEFAULT_VERIFICATION_CONFIG, 23 24 } from "./types.js"; 24 - import { BlockVerifier } from "./verification.js"; 25 + import { BlockVerifier, RemoteVerifier } from "./verification.js"; 25 26 import { RepoFetcher, extractPdsEndpoint } from "./repo-fetcher.js"; 26 27 import { PeerDiscovery } from "./peer-discovery.js"; 27 28 ··· 590 591 expect(verification.missing).toEqual([]); 591 592 }); 592 593 594 + it("tracks block CIDs for verification", async () => { 595 + // 1. Create records in source 596 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 597 + $type: "app.bsky.feed.post", 598 + text: "tracking test", 599 + createdAt: new Date().toISOString(), 600 + }); 601 + 602 + // 2. Export as CAR, parse, store blocks 603 + const carBytes = await sourceRepo.getRepoCar(); 604 + const { blocks } = await readCarWithRoot(carBytes); 605 + await replicaIpfs.putBlocks(blocks); 606 + 607 + // 3. Collect CID strings 608 + const internalMap = ( 609 + blocks as unknown as { map: Map<string, Uint8Array> } 610 + ).map; 611 + const cidStrs = Array.from(internalMap.keys()); 612 + 613 + // 4. Track blocks in sync storage 614 + const syncStorage = new SyncStorage(replicaDb); 615 + syncStorage.initSchema(); 616 + syncStorage.trackBlocks("did:plc:test123", cidStrs); 617 + 618 + // 5. Verify tracking 619 + expect(syncStorage.getBlockCount("did:plc:test123")).toBe( 620 + cidStrs.length, 621 + ); 622 + const stored = syncStorage.getBlockCids("did:plc:test123"); 623 + expect(stored.sort()).toEqual(cidStrs.sort()); 624 + 625 + // 6. Tracking is idempotent 626 + syncStorage.trackBlocks("did:plc:test123", cidStrs); 627 + expect(syncStorage.getBlockCount("did:plc:test123")).toBe( 628 + cidStrs.length, 629 + ); 630 + 631 + // 7. Clear blocks 632 + syncStorage.clearBlocks("did:plc:test123"); 633 + expect(syncStorage.getBlockCount("did:plc:test123")).toBe(0); 634 + }); 635 + 593 636 it("manifest record updated with sync rev after replication", async () => { 594 637 // Create a manifest record in replica 595 638 const sourceDid = "did:plc:test123"; ··· 634 677 expect(record.status).toBe("active"); 635 678 }); 636 679 }); 680 + 681 + // ============================================ 682 + // SyncStorage: Block Tracking 683 + // ============================================ 684 + 685 + describe("SyncStorage block tracking", () => { 686 + let tmpDir: string; 687 + let db: InstanceType<typeof Database>; 688 + let storage: SyncStorage; 689 + 690 + beforeEach(() => { 691 + tmpDir = mkdtempSync(join(tmpdir(), "block-tracking-test-")); 692 + db = new Database(join(tmpDir, "test.db")); 693 + storage = new SyncStorage(db); 694 + storage.initSchema(); 695 + }); 696 + 697 + afterEach(() => { 698 + db.close(); 699 + rmSync(tmpDir, { recursive: true, force: true }); 700 + }); 701 + 702 + it("tracks blocks for a DID", () => { 703 + storage.trackBlocks("did:plc:a", ["cid1", "cid2", "cid3"]); 704 + expect(storage.getBlockCount("did:plc:a")).toBe(3); 705 + expect(storage.getBlockCids("did:plc:a").sort()).toEqual([ 706 + "cid1", 707 + "cid2", 708 + "cid3", 709 + ]); 710 + }); 711 + 712 + it("ignores duplicate CIDs", () => { 713 + storage.trackBlocks("did:plc:a", ["cid1", "cid2"]); 714 + storage.trackBlocks("did:plc:a", ["cid2", "cid3"]); 715 + expect(storage.getBlockCount("did:plc:a")).toBe(3); 716 + }); 717 + 718 + it("tracks blocks per-DID independently", () => { 719 + storage.trackBlocks("did:plc:a", ["cid1", "cid2"]); 720 + storage.trackBlocks("did:plc:b", ["cid2", "cid3"]); 721 + expect(storage.getBlockCount("did:plc:a")).toBe(2); 722 + expect(storage.getBlockCount("did:plc:b")).toBe(2); 723 + }); 724 + 725 + it("clears blocks for a specific DID", () => { 726 + storage.trackBlocks("did:plc:a", ["cid1"]); 727 + storage.trackBlocks("did:plc:b", ["cid2"]); 728 + storage.clearBlocks("did:plc:a"); 729 + expect(storage.getBlockCount("did:plc:a")).toBe(0); 730 + expect(storage.getBlockCount("did:plc:b")).toBe(1); 731 + }); 732 + 733 + it("returns 0 count for unknown DID", () => { 734 + expect(storage.getBlockCount("did:plc:unknown")).toBe(0); 735 + }); 736 + 737 + it("returns empty array for unknown DID", () => { 738 + expect(storage.getBlockCids("did:plc:unknown")).toEqual([]); 739 + }); 740 + 741 + it("handles empty CID array", () => { 742 + storage.trackBlocks("did:plc:a", []); 743 + expect(storage.getBlockCount("did:plc:a")).toBe(0); 744 + }); 745 + }); 746 + 747 + // ============================================ 748 + // RemoteVerifier 749 + // ============================================ 750 + 751 + describe("RemoteVerifier", () => { 752 + let tmpDir: string; 753 + let ipfsService: IpfsService; 754 + 755 + beforeEach(async () => { 756 + tmpDir = mkdtempSync(join(tmpdir(), "remote-verifier-test-")); 757 + ipfsService = new IpfsService({ 758 + blocksPath: join(tmpDir, "blocks"), 759 + datastorePath: join(tmpDir, "datastore"), 760 + networking: false, 761 + }); 762 + await ipfsService.start(); 763 + }); 764 + 765 + afterEach(async () => { 766 + if (ipfsService.isRunning()) { 767 + await ipfsService.stop(); 768 + } 769 + rmSync(tmpDir, { recursive: true, force: true }); 770 + }); 771 + 772 + it("Layer 0: passes when commit root is served via RASL", async () => { 773 + const bytes = new TextEncoder().encode("root-block"); 774 + const cidStr = await makeCidStr(bytes); 775 + await ipfsService.putBlock(cidStr, bytes); 776 + 777 + const mockFetch = async (url: string | URL | Request) => { 778 + const urlStr = typeof url === "string" ? url : url.toString(); 779 + if (urlStr.includes(cidStr)) { 780 + return new Response(bytes, { status: 200 }); 781 + } 782 + return new Response(null, { status: 404 }); 783 + }; 784 + 785 + const verifier = new RemoteVerifier( 786 + ipfsService, 787 + { raslSampleSize: 10 }, 788 + mockFetch as unknown as typeof fetch, 789 + ); 790 + 791 + const result = await verifier.verifyPeer( 792 + "did:plc:test", 793 + "https://pds.example.com", 794 + cidStr, 795 + [], 796 + ); 797 + 798 + const layer0 = result.layers.find((l) => l.layer === 0); 799 + expect(layer0).toBeDefined(); 800 + expect(layer0!.passed).toBe(true); 801 + expect(layer0!.available).toBe(1); 802 + }); 803 + 804 + it("Layer 0: fails when commit root returns 404", async () => { 805 + const bytes = new TextEncoder().encode("missing-root"); 806 + const cidStr = await makeCidStr(bytes); 807 + 808 + const mockFetch = async () => { 809 + return new Response(null, { status: 404 }); 810 + }; 811 + 812 + const verifier = new RemoteVerifier( 813 + ipfsService, 814 + {}, 815 + mockFetch as unknown as typeof fetch, 816 + ); 817 + 818 + const result = await verifier.verifyPeer( 819 + "did:plc:test", 820 + "https://pds.example.com", 821 + cidStr, 822 + [], 823 + ); 824 + 825 + const layer0 = result.layers.find((l) => l.layer === 0); 826 + expect(layer0!.passed).toBe(false); 827 + expect(layer0!.missing).toContain(cidStr); 828 + }); 829 + 830 + it("Layer 0: fails on network error", async () => { 831 + const bytes = new TextEncoder().encode("error-root"); 832 + const cidStr = await makeCidStr(bytes); 833 + 834 + const mockFetch = async () => { 835 + throw new Error("Connection refused"); 836 + }; 837 + 838 + const verifier = new RemoteVerifier( 839 + ipfsService, 840 + {}, 841 + mockFetch as unknown as typeof fetch, 842 + ); 843 + 844 + const result = await verifier.verifyPeer( 845 + "did:plc:test", 846 + "https://pds.example.com", 847 + cidStr, 848 + [], 849 + ); 850 + 851 + const layer0 = result.layers.find((l) => l.layer === 0); 852 + expect(layer0!.passed).toBe(false); 853 + expect(layer0!.error).toBe("Connection refused"); 854 + }); 855 + 856 + it("Layer 1: passes when all sampled blocks match local", async () => { 857 + const blocks: Array<{ cid: string; bytes: Uint8Array }> = []; 858 + for (let i = 0; i < 5; i++) { 859 + const bytes = new TextEncoder().encode(`rasl-block-${i}`); 860 + const cidStr = await makeCidStr(bytes); 861 + await ipfsService.putBlock(cidStr, bytes); 862 + blocks.push({ cid: cidStr, bytes }); 863 + } 864 + 865 + const mockFetch = async (url: string | URL | Request) => { 866 + const urlStr = typeof url === "string" ? url : url.toString(); 867 + const block = blocks.find((b) => urlStr.includes(b.cid)); 868 + if (block) { 869 + return new Response(block.bytes, { status: 200 }); 870 + } 871 + return new Response(null, { status: 404 }); 872 + }; 873 + 874 + const verifier = new RemoteVerifier( 875 + ipfsService, 876 + { raslSampleSize: 100 }, 877 + mockFetch as unknown as typeof fetch, 878 + ); 879 + 880 + const result = await verifier.verifyPeer( 881 + "did:plc:test", 882 + "https://pds.example.com", 883 + null, 884 + blocks.map((b) => b.cid), 885 + ); 886 + 887 + const layer1 = result.layers.find((l) => l.layer === 1); 888 + expect(layer1).toBeDefined(); 889 + expect(layer1!.passed).toBe(true); 890 + expect(layer1!.checked).toBe(5); 891 + expect(layer1!.available).toBe(5); 892 + }); 893 + 894 + it("Layer 1: fails when some blocks return 404", async () => { 895 + const presentBytes = new TextEncoder().encode("present-block"); 896 + const presentCid = await makeCidStr(presentBytes); 897 + await ipfsService.putBlock(presentCid, presentBytes); 898 + 899 + const missingBytes = new TextEncoder().encode("missing-block"); 900 + const missingCid = await makeCidStr(missingBytes); 901 + await ipfsService.putBlock(missingCid, missingBytes); 902 + 903 + const mockFetch = async (url: string | URL | Request) => { 904 + const urlStr = typeof url === "string" ? url : url.toString(); 905 + if (urlStr.includes(presentCid)) { 906 + return new Response(presentBytes, { status: 200 }); 907 + } 908 + return new Response(null, { status: 404 }); 909 + }; 910 + 911 + const verifier = new RemoteVerifier( 912 + ipfsService, 913 + { raslSampleSize: 100 }, 914 + mockFetch as unknown as typeof fetch, 915 + ); 916 + 917 + const result = await verifier.verifyPeer( 918 + "did:plc:test", 919 + "https://pds.example.com", 920 + null, 921 + [presentCid, missingCid], 922 + ); 923 + 924 + const layer1 = result.layers.find((l) => l.layer === 1); 925 + expect(layer1!.passed).toBe(false); 926 + expect(layer1!.available).toBe(1); 927 + expect(layer1!.missing).toContain(missingCid); 928 + }); 929 + 930 + it("Layer 1: detects content mismatch", async () => { 931 + const bytes = new TextEncoder().encode("real-content"); 932 + const cidStr = await makeCidStr(bytes); 933 + await ipfsService.putBlock(cidStr, bytes); 934 + 935 + const wrongBytes = new TextEncoder().encode("tampered-content"); 936 + 937 + const mockFetch = async () => { 938 + return new Response(wrongBytes, { status: 200 }); 939 + }; 940 + 941 + const verifier = new RemoteVerifier( 942 + ipfsService, 943 + { raslSampleSize: 100 }, 944 + mockFetch as unknown as typeof fetch, 945 + ); 946 + 947 + const result = await verifier.verifyPeer( 948 + "did:plc:test", 949 + "https://pds.example.com", 950 + null, 951 + [cidStr], 952 + ); 953 + 954 + const layer1 = result.layers.find((l) => l.layer === 1); 955 + expect(layer1!.passed).toBe(false); 956 + expect(layer1!.missing).toContain(cidStr); 957 + }); 958 + 959 + it("combined: overallPassed is true when all layers pass", async () => { 960 + const bytes = new TextEncoder().encode("all-pass"); 961 + const cidStr = await makeCidStr(bytes); 962 + await ipfsService.putBlock(cidStr, bytes); 963 + 964 + const mockFetch = async (url: string | URL | Request) => { 965 + const urlStr = typeof url === "string" ? url : url.toString(); 966 + if (urlStr.includes(cidStr)) { 967 + return new Response(bytes, { status: 200 }); 968 + } 969 + return new Response(null, { status: 404 }); 970 + }; 971 + 972 + const verifier = new RemoteVerifier( 973 + ipfsService, 974 + { raslSampleSize: 100 }, 975 + mockFetch as unknown as typeof fetch, 976 + ); 977 + 978 + const result = await verifier.verifyPeer( 979 + "did:plc:test", 980 + "https://pds.example.com", 981 + cidStr, 982 + [cidStr], 983 + ); 984 + 985 + expect(result.overallPassed).toBe(true); 986 + expect(result.did).toBe("did:plc:test"); 987 + expect(result.pdsEndpoint).toBe("https://pds.example.com"); 988 + expect(result.layers.length).toBe(4); // 0, 1, 2 (stub), 3 (stub) 989 + }); 990 + 991 + it("combined: overallPassed is false when any layer fails", async () => { 992 + const bytes = new TextEncoder().encode("fail-test"); 993 + const cidStr = await makeCidStr(bytes); 994 + 995 + const mockFetch = async () => { 996 + return new Response(null, { status: 404 }); 997 + }; 998 + 999 + const verifier = new RemoteVerifier( 1000 + ipfsService, 1001 + {}, 1002 + mockFetch as unknown as typeof fetch, 1003 + ); 1004 + 1005 + const result = await verifier.verifyPeer( 1006 + "did:plc:test", 1007 + "https://pds.example.com", 1008 + cidStr, 1009 + [cidStr], 1010 + ); 1011 + 1012 + expect(result.overallPassed).toBe(false); 1013 + }); 1014 + 1015 + it("skips Layer 0 when rootCid is null", async () => { 1016 + const mockFetch = async () => { 1017 + return new Response(null, { status: 404 }); 1018 + }; 1019 + 1020 + const verifier = new RemoteVerifier( 1021 + ipfsService, 1022 + {}, 1023 + mockFetch as unknown as typeof fetch, 1024 + ); 1025 + 1026 + const result = await verifier.verifyPeer( 1027 + "did:plc:test", 1028 + "https://pds.example.com", 1029 + null, 1030 + [], 1031 + ); 1032 + 1033 + expect(result.layers.find((l) => l.layer === 0)).toBeUndefined(); 1034 + }); 1035 + 1036 + it("skips Layer 1 when blockCids is empty", async () => { 1037 + const mockFetch = async () => { 1038 + return new Response(null, { status: 200 }); 1039 + }; 1040 + 1041 + const verifier = new RemoteVerifier( 1042 + ipfsService, 1043 + {}, 1044 + mockFetch as unknown as typeof fetch, 1045 + ); 1046 + 1047 + const result = await verifier.verifyPeer( 1048 + "did:plc:test", 1049 + "https://pds.example.com", 1050 + null, 1051 + [], 1052 + ); 1053 + 1054 + expect(result.layers.find((l) => l.layer === 1)).toBeUndefined(); 1055 + }); 1056 + 1057 + it("Layer 2 and 3 are stubs", async () => { 1058 + const mockFetch = async () => { 1059 + return new Response(null, { status: 200 }); 1060 + }; 1061 + 1062 + const verifier = new RemoteVerifier( 1063 + ipfsService, 1064 + {}, 1065 + mockFetch as unknown as typeof fetch, 1066 + ); 1067 + 1068 + const result = await verifier.verifyPeer( 1069 + "did:plc:test", 1070 + "https://pds.example.com", 1071 + null, 1072 + [], 1073 + ); 1074 + 1075 + const layer2 = result.layers.find((l) => l.layer === 2); 1076 + expect(layer2).toBeDefined(); 1077 + expect(layer2!.checked).toBe(0); 1078 + expect(layer2!.error).toContain("not implemented"); 1079 + 1080 + const layer3 = result.layers.find((l) => l.layer === 3); 1081 + expect(layer3).toBeDefined(); 1082 + expect(layer3!.checked).toBe(0); 1083 + expect(layer3!.error).toContain("not implemented"); 1084 + }); 1085 + 1086 + it("respects raslSampleSize config", async () => { 1087 + const blocks: Array<{ cid: string; bytes: Uint8Array }> = []; 1088 + for (let i = 0; i < 20; i++) { 1089 + const bytes = new TextEncoder().encode(`sample-block-${i}`); 1090 + const cidStr = await makeCidStr(bytes); 1091 + await ipfsService.putBlock(cidStr, bytes); 1092 + blocks.push({ cid: cidStr, bytes }); 1093 + } 1094 + 1095 + const mockFetch = async (url: string | URL | Request) => { 1096 + const urlStr = typeof url === "string" ? url : url.toString(); 1097 + const block = blocks.find((b) => urlStr.includes(b.cid)); 1098 + if (block) { 1099 + return new Response(block.bytes, { status: 200 }); 1100 + } 1101 + return new Response(null, { status: 404 }); 1102 + }; 1103 + 1104 + const verifier = new RemoteVerifier( 1105 + ipfsService, 1106 + { raslSampleSize: 5 }, 1107 + mockFetch as unknown as typeof fetch, 1108 + ); 1109 + 1110 + const result = await verifier.verifyPeer( 1111 + "did:plc:test", 1112 + "https://pds.example.com", 1113 + null, 1114 + blocks.map((b) => b.cid), 1115 + ); 1116 + 1117 + const layer1 = result.layers.find((l) => l.layer === 1); 1118 + expect(layer1!.checked).toBe(5); 1119 + }); 1120 + });
+54 -1
src/replication/sync-storage.ts
··· 10 10 constructor(private db: Database.Database) {} 11 11 12 12 /** 13 - * Create the replication_state table if it doesn't exist. 13 + * Create the replication tables if they don't exist. 14 14 */ 15 15 initSchema(): void { 16 16 this.db.exec(` ··· 24 24 last_verified_at TEXT, 25 25 status TEXT NOT NULL DEFAULT 'pending', 26 26 error_message TEXT 27 + ); 28 + 29 + CREATE TABLE IF NOT EXISTS replication_blocks ( 30 + did TEXT NOT NULL, 31 + cid TEXT NOT NULL, 32 + PRIMARY KEY (did, cid) 27 33 ); 28 34 `); 29 35 } ··· 142 148 SET last_verified_at = datetime('now') 143 149 WHERE did = ?`, 144 150 ) 151 + .run(did); 152 + } 153 + 154 + /** 155 + * Track block CIDs for a DID (batch insert, ignores duplicates). 156 + */ 157 + trackBlocks(did: string, cids: string[]): void { 158 + if (cids.length === 0) return; 159 + const insert = this.db.prepare( 160 + "INSERT OR IGNORE INTO replication_blocks (did, cid) VALUES (?, ?)", 161 + ); 162 + const batch = this.db.transaction((items: string[]) => { 163 + for (const cid of items) { 164 + insert.run(did, cid); 165 + } 166 + }); 167 + batch(cids); 168 + } 169 + 170 + /** 171 + * Get all tracked block CIDs for a DID. 172 + */ 173 + getBlockCids(did: string): string[] { 174 + const rows = this.db 175 + .prepare("SELECT cid FROM replication_blocks WHERE did = ?") 176 + .all(did) as Array<{ cid: string }>; 177 + return rows.map((r) => r.cid); 178 + } 179 + 180 + /** 181 + * Get the count of tracked blocks for a DID. 182 + */ 183 + getBlockCount(did: string): number { 184 + const row = this.db 185 + .prepare( 186 + "SELECT COUNT(*) as count FROM replication_blocks WHERE did = ?", 187 + ) 188 + .get(did) as { count: number }; 189 + return row.count; 190 + } 191 + 192 + /** 193 + * Clear all tracked blocks for a DID. 194 + */ 195 + clearBlocks(did: string): void { 196 + this.db 197 + .prepare("DELETE FROM replication_blocks WHERE did = ?") 145 198 .run(did); 146 199 } 147 200
+40
src/replication/types.ts
··· 52 52 export function rkeyToDid(rkey: string): string { 53 53 return rkey.replace(/-/g, ":"); 54 54 } 55 + 56 + /** Configuration for layered remote verification. */ 57 + export interface VerificationConfig { 58 + /** Layer 1: how many blocks to sample via RASL (default 50). */ 59 + raslSampleSize: number; 60 + /** Layer 2: how many blocks to fetch via bitswap (default 5). Future use. */ 61 + bitswapSampleSize: number; 62 + /** Layer 3: how many record paths to verify (default 2). Future use. */ 63 + mstProofCount: number; 64 + /** How often to run verification in ms (default 30 minutes). */ 65 + verificationIntervalMs: number; 66 + } 67 + 68 + export const DEFAULT_VERIFICATION_CONFIG: VerificationConfig = { 69 + raslSampleSize: 50, 70 + bitswapSampleSize: 5, 71 + mstProofCount: 2, 72 + verificationIntervalMs: 30 * 60 * 1000, 73 + }; 74 + 75 + /** Result of a single verification layer. */ 76 + export interface LayerResult { 77 + layer: number; 78 + name: string; 79 + passed: boolean; 80 + checked: number; 81 + available: number; 82 + missing: string[]; 83 + error?: string; 84 + durationMs: number; 85 + } 86 + 87 + /** Combined result of all verification layers for a peer. */ 88 + export interface LayeredVerificationResult { 89 + did: string; 90 + pdsEndpoint: string; 91 + timestamp: string; 92 + layers: LayerResult[]; 93 + overallPassed: boolean; 94 + }
+193 -1
src/replication/verification.ts
··· 1 1 /** 2 - * Spot-check block availability in IPFS blockstore. 2 + * Block verification: local spot-checks and layered remote verification. 3 3 */ 4 4 5 5 import type { IpfsService } from "../ipfs.js"; 6 + import { 7 + type VerificationConfig, 8 + type LayerResult, 9 + type LayeredVerificationResult, 10 + DEFAULT_VERIFICATION_CONFIG, 11 + } from "./types.js"; 6 12 7 13 export interface VerificationResult { 8 14 checked: number; ··· 60 66 return shuffled.slice(-size); 61 67 } 62 68 } 69 + 70 + /** 71 + * Layered remote verification. 72 + * 73 + * Verifies that a remote PDS is actually hosting blocks it claims to serve. 74 + * Uses content-addressed retrieval (RASL endpoint) so responses are unforgeable. 75 + * 76 + * Layer 0: Commit root — fetch the root CID via RASL to confirm the remote serves it. 77 + * Layer 1: RASL sampling — fetch random blocks via HTTP, compare with local copy. 78 + * Layer 2: Bitswap (future) — fetch blocks via IPFS network. 79 + * Layer 3: MST path proof (future) — verify MST path proofs via sync.getRecord. 80 + */ 81 + export class RemoteVerifier { 82 + private config: VerificationConfig; 83 + private ipfsService: IpfsService; 84 + private fetchFn: typeof fetch; 85 + 86 + constructor( 87 + ipfsService: IpfsService, 88 + config?: Partial<VerificationConfig>, 89 + fetchFn?: typeof fetch, 90 + ) { 91 + this.ipfsService = ipfsService; 92 + this.config = { ...DEFAULT_VERIFICATION_CONFIG, ...config }; 93 + this.fetchFn = fetchFn ?? fetch; 94 + } 95 + 96 + /** 97 + * Run all verification layers against a remote peer. 98 + */ 99 + async verifyPeer( 100 + did: string, 101 + pdsEndpoint: string, 102 + rootCid: string | null, 103 + blockCids: string[], 104 + ): Promise<LayeredVerificationResult> { 105 + const layers: LayerResult[] = []; 106 + 107 + // Layer 0: Commit root fetch 108 + if (rootCid) { 109 + layers.push(await this.verifyCommitRoot(pdsEndpoint, rootCid)); 110 + } 111 + 112 + // Layer 1: RASL sampling 113 + if (blockCids.length > 0) { 114 + layers.push(await this.verifyViaRasl(pdsEndpoint, blockCids)); 115 + } 116 + 117 + // Layer 2: Bitswap (stub — requires lower-level libp2p APIs) 118 + layers.push({ 119 + layer: 2, 120 + name: "bitswap", 121 + passed: true, 122 + checked: 0, 123 + available: 0, 124 + missing: [], 125 + error: "not implemented: requires IPFS networking", 126 + durationMs: 0, 127 + }); 128 + 129 + // Layer 3: MST path proof (stub — requires sync.getRecord + CAR verification) 130 + layers.push({ 131 + layer: 3, 132 + name: "mst-proof", 133 + passed: true, 134 + checked: 0, 135 + available: 0, 136 + missing: [], 137 + error: "not implemented: future enhancement", 138 + durationMs: 0, 139 + }); 140 + 141 + return { 142 + did, 143 + pdsEndpoint, 144 + timestamp: new Date().toISOString(), 145 + layers, 146 + overallPassed: layers.every((l) => l.passed), 147 + }; 148 + } 149 + 150 + /** 151 + * Layer 0: Fetch the commit root CID via RASL. 152 + * A 200 with correct bytes proves the remote serves the current repo head. 153 + */ 154 + private async verifyCommitRoot( 155 + pdsEndpoint: string, 156 + rootCid: string, 157 + ): Promise<LayerResult> { 158 + const start = Date.now(); 159 + try { 160 + const url = `${pdsEndpoint}/.well-known/rasl/${rootCid}`; 161 + const res = await this.fetchFn(url); 162 + const passed = res.status === 200; 163 + return { 164 + layer: 0, 165 + name: "commit-root", 166 + passed, 167 + checked: 1, 168 + available: passed ? 1 : 0, 169 + missing: passed ? [] : [rootCid], 170 + durationMs: Date.now() - start, 171 + }; 172 + } catch (err) { 173 + return { 174 + layer: 0, 175 + name: "commit-root", 176 + passed: false, 177 + checked: 1, 178 + available: 0, 179 + missing: [rootCid], 180 + error: err instanceof Error ? err.message : String(err), 181 + durationMs: Date.now() - start, 182 + }; 183 + } 184 + } 185 + 186 + /** 187 + * Layer 1: Fetch a random sample of blocks via RASL and verify against local copies. 188 + * Content-addressed retrieval is unforgeable: correct bytes = peer has the data. 189 + */ 190 + private async verifyViaRasl( 191 + pdsEndpoint: string, 192 + cids: string[], 193 + ): Promise<LayerResult> { 194 + const start = Date.now(); 195 + const sampleSize = Math.min(this.config.raslSampleSize, cids.length); 196 + const sample = 197 + sampleSize >= cids.length 198 + ? [...cids] 199 + : this.randomSample(cids, sampleSize); 200 + 201 + const missing: string[] = []; 202 + let available = 0; 203 + 204 + for (const cid of sample) { 205 + try { 206 + const url = `${pdsEndpoint}/.well-known/rasl/${cid}`; 207 + const res = await this.fetchFn(url); 208 + if (res.status === 200) { 209 + const remoteBytes = new Uint8Array(await res.arrayBuffer()); 210 + const localBytes = await this.ipfsService.getBlock(cid); 211 + if ( 212 + localBytes && 213 + Buffer.from(remoteBytes).equals(Buffer.from(localBytes)) 214 + ) { 215 + available++; 216 + } else if (!localBytes) { 217 + // We don't have it locally to compare, but remote served it 218 + available++; 219 + } else { 220 + // Content mismatch — serious integrity issue 221 + missing.push(cid); 222 + } 223 + } else { 224 + missing.push(cid); 225 + } 226 + } catch { 227 + missing.push(cid); 228 + } 229 + } 230 + 231 + return { 232 + layer: 1, 233 + name: "rasl-sampling", 234 + passed: missing.length === 0, 235 + checked: sample.length, 236 + available, 237 + missing, 238 + durationMs: Date.now() - start, 239 + }; 240 + } 241 + 242 + private randomSample(arr: string[], size: number): string[] { 243 + const shuffled = [...arr]; 244 + for ( 245 + let i = shuffled.length - 1; 246 + i > 0 && i >= shuffled.length - size; 247 + i-- 248 + ) { 249 + const j = Math.floor(Math.random() * (i + 1)); 250 + [shuffled[i], shuffled[j]] = [shuffled[j]!, shuffled[i]!]; 251 + } 252 + return shuffled.slice(-size); 253 + } 254 + }