atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Enable P2P repo serving and peer fallback fetch

Extend getRepo and getBlocks XRPC endpoints to serve replicated DID
data from the BlockStore, enabling other P2PDS nodes to sync repos
from peers instead of only from the source PDS.

Add peer_endpoints table to track which peers have which DIDs,
populated during manifest discovery. When syncDid() fails to reach
the source PDS, it now falls back to fetching from known peer
endpoints sorted by freshest data.

+788 -33
+2 -2
src/index.ts
··· 204 204 // Sync endpoints (federation) 205 205 // ============================================ 206 206 app.get("/xrpc/com.atproto.sync.getRepo", (c) => 207 - sync.getRepo(c, repoManager), 207 + sync.getRepo(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 208 208 ); 209 209 app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => 210 210 sync.getRepoStatus(c, repoManager, replicatedRepoReader), 211 211 ); 212 212 app.get("/xrpc/com.atproto.sync.getBlocks", (c) => 213 - sync.getBlocks(c, repoManager), 213 + sync.getBlocks(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 214 214 ); 215 215 app.get("/xrpc/com.atproto.sync.getBlob", (c) => 216 216 sync.getBlob(c, repoManager, blockStore, replicationManager?.getSyncStorage()),
+88 -5
src/replication/replication-manager.ts
··· 128 128 this.challengeStorage.initSchema(); 129 129 await this.publishPeerIdentity(); 130 130 await this.syncManifests(); 131 + await this.discoverPeerEndpoints(); 131 132 await this.runOfferDiscovery(); 132 133 } 133 134 ··· 243 244 } 244 245 245 246 /** 247 + * Discover peer endpoints by scanning manifests of known peers. 248 + * For each peer whose manifest includes a DID we're tracking, record 249 + * that peer as a potential fallback source for that DID. 250 + */ 251 + private async discoverPeerEndpoints(): Promise<void> { 252 + const trackedDids = new Set(this.getReplicateDids()); 253 + const states = this.syncStorage.getAllStates(); 254 + 255 + for (const state of states) { 256 + if (!state.pdsEndpoint) continue; 257 + 258 + try { 259 + const manifests = await this.peerDiscovery.discoverManifests( 260 + state.did, 261 + state.pdsEndpoint, 262 + ); 263 + 264 + for (const manifest of manifests) { 265 + if ( 266 + manifest.subject && 267 + trackedDids.has(manifest.subject) && 268 + manifest.status === "active" 269 + ) { 270 + this.syncStorage.upsertPeerEndpoint( 271 + manifest.subject, 272 + state.did, 273 + state.pdsEndpoint, 274 + manifest.lastSyncRev ?? null, 275 + ); 276 + } 277 + } 278 + } catch { 279 + // Non-fatal: peer endpoint discovery is best-effort 280 + } 281 + } 282 + } 283 + 284 + /** 285 + * Fetch a repo from known peer endpoints when the source PDS is unavailable. 286 + * Tries peers in order of freshest data (highest lastSyncRev). 287 + * Throws the original error if no peers succeed. 288 + */ 289 + private async fetchFromPeersOrThrow( 290 + did: string, 291 + since: string | undefined, 292 + originalError: Error, 293 + ): Promise<Uint8Array> { 294 + const peers = this.syncStorage.getPeerEndpoints(did); 295 + if (peers.length === 0) throw originalError; 296 + 297 + // Sort by lastSyncRev descending (prefer freshest data) 298 + peers.sort((a, b) => { 299 + if (!a.lastSyncRev && !b.lastSyncRev) return 0; 300 + if (!a.lastSyncRev) return 1; 301 + if (!b.lastSyncRev) return -1; 302 + return b.lastSyncRev.localeCompare(a.lastSyncRev); 303 + }); 304 + 305 + for (const peer of peers) { 306 + try { 307 + return await this.repoFetcher.fetchRepo( 308 + peer.pdsEndpoint, 309 + did, 310 + since, 311 + ); 312 + } catch { 313 + // Try next peer 314 + } 315 + } 316 + 317 + throw originalError; 318 + } 319 + 320 + /** 246 321 * Sync all configured DIDs. 247 322 * 248 323 * When a PolicyEngine is present: ··· 277 352 this.syncStorage.updateStatus(did, "error", message); 278 353 } 279 354 } 355 + 356 + // Discover peer endpoints for P2P fallback 357 + await this.discoverPeerEndpoints(); 280 358 281 359 // Re-run offer discovery to pick up new/revoked offers 282 360 await this.runOfferDiscovery(); ··· 368 446 did, 369 447 since, 370 448 ); 371 - } catch (err) { 372 - // On failure, clear cached peer info and try without `since` 449 + } catch (sourceErr) { 450 + // On failure, clear cached peer info 373 451 this.syncStorage.clearPeerInfo(did); 452 + const err = sourceErr instanceof Error ? sourceErr : new Error(String(sourceErr)); 374 453 if (since) { 375 - // Retry full sync 376 - carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did); 454 + // Retry full sync from source, then fall back to peers 455 + try { 456 + carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did); 457 + } catch { 458 + carBytes = await this.fetchFromPeersOrThrow(did, since, err); 459 + } 377 460 } else { 378 - throw err; 461 + carBytes = await this.fetchFromPeersOrThrow(did, undefined, err); 379 462 } 380 463 } 381 464
+547
src/replication/replication.test.ts
··· 1756 1756 expect(rev).toMatch(/^[a-z2-7]{13}$/); 1757 1757 }); 1758 1758 }); 1759 + 1760 + // ============================================ 1761 + // SyncStorage: Peer Endpoint Tracking 1762 + // ============================================ 1763 + 1764 + describe("SyncStorage peer endpoint tracking", () => { 1765 + let tmpDir: string; 1766 + let db: InstanceType<typeof Database>; 1767 + let storage: SyncStorage; 1768 + 1769 + beforeEach(() => { 1770 + tmpDir = mkdtempSync(join(tmpdir(), "peer-endpoint-test-")); 1771 + db = new Database(join(tmpDir, "test.db")); 1772 + storage = new SyncStorage(db); 1773 + storage.initSchema(); 1774 + }); 1775 + 1776 + afterEach(() => { 1777 + db.close(); 1778 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1779 + }); 1780 + 1781 + it("upsertPeerEndpoint + getPeerEndpoints returns entries", () => { 1782 + storage.upsertPeerEndpoint( 1783 + "did:plc:target1", 1784 + "did:plc:peer1", 1785 + "https://peer1.example.com", 1786 + "rev123", 1787 + ); 1788 + const peers = storage.getPeerEndpoints("did:plc:target1"); 1789 + expect(peers).toHaveLength(1); 1790 + expect(peers[0]!.peerDid).toBe("did:plc:peer1"); 1791 + expect(peers[0]!.pdsEndpoint).toBe("https://peer1.example.com"); 1792 + expect(peers[0]!.lastSyncRev).toBe("rev123"); 1793 + }); 1794 + 1795 + it("duplicate peer_did+target_did upsert updates existing", () => { 1796 + storage.upsertPeerEndpoint( 1797 + "did:plc:target1", 1798 + "did:plc:peer1", 1799 + "https://old.example.com", 1800 + "rev1", 1801 + ); 1802 + storage.upsertPeerEndpoint( 1803 + "did:plc:target1", 1804 + "did:plc:peer1", 1805 + "https://new.example.com", 1806 + "rev2", 1807 + ); 1808 + const peers = storage.getPeerEndpoints("did:plc:target1"); 1809 + expect(peers).toHaveLength(1); 1810 + expect(peers[0]!.pdsEndpoint).toBe("https://new.example.com"); 1811 + expect(peers[0]!.lastSyncRev).toBe("rev2"); 1812 + }); 1813 + 1814 + it("getPeerEndpoints returns empty for unknown DID", () => { 1815 + const peers = storage.getPeerEndpoints("did:plc:unknown"); 1816 + expect(peers).toEqual([]); 1817 + }); 1818 + 1819 + it("clearPeerEndpoints removes all entries for a DID", () => { 1820 + storage.upsertPeerEndpoint( 1821 + "did:plc:target1", 1822 + "did:plc:peer1", 1823 + "https://peer1.example.com", 1824 + null, 1825 + ); 1826 + storage.upsertPeerEndpoint( 1827 + "did:plc:target1", 1828 + "did:plc:peer2", 1829 + "https://peer2.example.com", 1830 + null, 1831 + ); 1832 + storage.upsertPeerEndpoint( 1833 + "did:plc:target2", 1834 + "did:plc:peer1", 1835 + "https://peer1.example.com", 1836 + null, 1837 + ); 1838 + 1839 + storage.clearPeerEndpoints("did:plc:target1"); 1840 + expect(storage.getPeerEndpoints("did:plc:target1")).toEqual([]); 1841 + // Other target DID's entries should remain 1842 + expect(storage.getPeerEndpoints("did:plc:target2")).toHaveLength(1); 1843 + }); 1844 + }); 1845 + 1846 + // ============================================ 1847 + // XRPC: getRepo / getBlocks for replicated DIDs 1848 + // ============================================ 1849 + 1850 + describe("XRPC: getRepo + getBlocks for replicated DIDs", () => { 1851 + let tmpDir: string; 1852 + let sourceDb: InstanceType<typeof Database>; 1853 + let replicaDb: InstanceType<typeof Database>; 1854 + let sourceIpfs: IpfsService; 1855 + let replicaIpfs: IpfsService; 1856 + let sourceRepo: RepoManager; 1857 + let replicaRepo: RepoManager; 1858 + let syncStorage: SyncStorage; 1859 + let app: ReturnType<typeof createApp>; 1860 + const sourceDid = "did:plc:test123"; 1861 + const replicaDid = "did:plc:replica456"; 1862 + let trackedCids: string[] = []; 1863 + 1864 + beforeEach(async () => { 1865 + tmpDir = mkdtempSync(join(tmpdir(), "xrpc-getrepo-test-")); 1866 + 1867 + // Source setup 1868 + const sourceConfig = testConfig(join(tmpDir, "source"), []); 1869 + sourceDb = new Database(join(tmpDir, "source.db")); 1870 + sourceIpfs = new IpfsService({ 1871 + blocksPath: join(tmpDir, "source-ipfs-blocks"), 1872 + datastorePath: join(tmpDir, "source-ipfs-datastore"), 1873 + networking: false, 1874 + }); 1875 + await sourceIpfs.start(); 1876 + sourceRepo = new RepoManager(sourceDb, sourceConfig); 1877 + sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1878 + 1879 + // Replica setup 1880 + const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); 1881 + replicaConfig.DID = replicaDid; 1882 + replicaConfig.SIGNING_KEY = 1883 + "0000000000000000000000000000000000000000000000000000000000000002"; 1884 + replicaDb = new Database(join(tmpDir, "replica.db")); 1885 + replicaIpfs = new IpfsService({ 1886 + blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1887 + datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1888 + networking: false, 1889 + }); 1890 + await replicaIpfs.start(); 1891 + replicaRepo = new RepoManager(replicaDb, replicaConfig); 1892 + replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 1893 + 1894 + syncStorage = new SyncStorage(replicaDb); 1895 + syncStorage.initSchema(); 1896 + 1897 + // Create a mock replicationManager with getSyncStorage 1898 + const mockReplicationManager = { 1899 + getSyncStorage: () => syncStorage, 1900 + } as unknown as import("./replication-manager.js").ReplicationManager; 1901 + 1902 + const firehose = new Firehose(replicaRepo); 1903 + app = createApp( 1904 + replicaConfig, 1905 + replicaRepo, 1906 + firehose, 1907 + replicaIpfs, // blockStore 1908 + replicaIpfs, // networkService 1909 + undefined, // blobStore 1910 + mockReplicationManager, 1911 + undefined, // replicatedRepoReader 1912 + ); 1913 + }); 1914 + 1915 + afterEach(async () => { 1916 + if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1917 + if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1918 + sourceDb.close(); 1919 + replicaDb.close(); 1920 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1921 + }); 1922 + 1923 + async function replicateSource(): Promise<void> { 1924 + const carBytes = await sourceRepo.getRepoCar(); 1925 + const { root, blocks } = await readCarWithRoot(carBytes); 1926 + await replicaIpfs.putBlocks(blocks); 1927 + 1928 + const rootCidStr = root.toString(); 1929 + const internalMap = ( 1930 + blocks as unknown as { map: Map<string, Uint8Array> } 1931 + ).map; 1932 + let rev = rootCidStr; 1933 + const commitBytes = internalMap?.get(rootCidStr); 1934 + if (commitBytes) { 1935 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1936 + if (typeof commitObj.rev === "string") { 1937 + rev = commitObj.rev; 1938 + } 1939 + } 1940 + 1941 + // Track block CIDs 1942 + trackedCids = Array.from(internalMap.keys()); 1943 + 1944 + syncStorage.upsertState({ 1945 + did: sourceDid, 1946 + pdsEndpoint: "https://pds.example.com", 1947 + }); 1948 + syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); 1949 + syncStorage.trackBlocks(sourceDid, trackedCids); 1950 + } 1951 + 1952 + it("getRepo serves replicated DID repo as valid CAR", async () => { 1953 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1954 + $type: "app.bsky.feed.post", 1955 + text: "replicated getRepo test", 1956 + createdAt: new Date().toISOString(), 1957 + }); 1958 + await replicateSource(); 1959 + 1960 + const res = await app.request( 1961 + `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`, 1962 + undefined, 1963 + {}, 1964 + ); 1965 + expect(res.status).toBe(200); 1966 + expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); 1967 + 1968 + // Parse the CAR and verify it has a valid root + blocks 1969 + const carBytes = new Uint8Array(await res.arrayBuffer()); 1970 + const { root, blocks } = await readCarWithRoot(carBytes); 1971 + expect(root).toBeDefined(); 1972 + const internalMap = ( 1973 + blocks as unknown as { map: Map<string, Uint8Array> } 1974 + ).map; 1975 + expect(internalMap.size).toBeGreaterThan(0); 1976 + }); 1977 + 1978 + it("getRepo returns 404 for non-replicated DID", async () => { 1979 + const res = await app.request( 1980 + `/xrpc/com.atproto.sync.getRepo?did=did:plc:nonexistent`, 1981 + undefined, 1982 + {}, 1983 + ); 1984 + expect(res.status).toBe(404); 1985 + }); 1986 + 1987 + it("getRepo returns 404 for replicated DID with no rootCid yet", async () => { 1988 + // Create state without rootCid 1989 + syncStorage.upsertState({ 1990 + did: sourceDid, 1991 + pdsEndpoint: "https://pds.example.com", 1992 + }); 1993 + 1994 + const res = await app.request( 1995 + `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`, 1996 + undefined, 1997 + {}, 1998 + ); 1999 + expect(res.status).toBe(404); 2000 + }); 2001 + 2002 + it("getBlocks serves requested blocks for replicated DID", async () => { 2003 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 2004 + $type: "app.bsky.feed.post", 2005 + text: "replicated getBlocks test", 2006 + createdAt: new Date().toISOString(), 2007 + }); 2008 + await replicateSource(); 2009 + 2010 + // Request a subset of tracked CIDs 2011 + const requestCids = trackedCids.slice(0, 2); 2012 + const cidsQuery = requestCids.map((c) => `cids=${c}`).join("&"); 2013 + const res = await app.request( 2014 + `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&${cidsQuery}`, 2015 + undefined, 2016 + {}, 2017 + ); 2018 + expect(res.status).toBe(200); 2019 + expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); 2020 + 2021 + const carBytes = new Uint8Array(await res.arrayBuffer()); 2022 + const { blocks } = await readCarWithRoot(carBytes); 2023 + const internalMap = ( 2024 + blocks as unknown as { map: Map<string, Uint8Array> } 2025 + ).map; 2026 + // Should contain the requested blocks 2027 + for (const cid of requestCids) { 2028 + expect(internalMap.has(cid)).toBe(true); 2029 + } 2030 + }); 2031 + 2032 + it("getBlocks returns 404 for non-tracked DID", async () => { 2033 + const res = await app.request( 2034 + `/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent&cids=bafytest`, 2035 + undefined, 2036 + {}, 2037 + ); 2038 + expect(res.status).toBe(404); 2039 + }); 2040 + 2041 + it("getBlocks only serves blocks tracked for the requested DID", async () => { 2042 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 2043 + $type: "app.bsky.feed.post", 2044 + text: "security test", 2045 + createdAt: new Date().toISOString(), 2046 + }); 2047 + await replicateSource(); 2048 + 2049 + // Store a block that is NOT tracked for sourceDid 2050 + const untrackedBytes = new TextEncoder().encode("untracked-block"); 2051 + const untrackedCid = await makeCidStr(untrackedBytes); 2052 + await replicaIpfs.putBlock(untrackedCid, untrackedBytes); 2053 + 2054 + // Request the untracked CID for the tracked DID 2055 + const res = await app.request( 2056 + `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&cids=${untrackedCid}`, 2057 + undefined, 2058 + {}, 2059 + ); 2060 + expect(res.status).toBe(200); 2061 + // The CAR should be returned but the untracked block should not be in it 2062 + const carBytes = new Uint8Array(await res.arrayBuffer()); 2063 + const { blocks } = await readCarWithRoot(carBytes); 2064 + const internalMap = ( 2065 + blocks as unknown as { map: Map<string, Uint8Array> } 2066 + ).map; 2067 + expect(internalMap.has(untrackedCid)).toBe(false); 2068 + }); 2069 + }); 2070 + 2071 + // ============================================ 2072 + // Peer fallback sync 2073 + // ============================================ 2074 + 2075 + describe("Peer fallback in syncDid", () => { 2076 + let tmpDir: string; 2077 + let db: InstanceType<typeof Database>; 2078 + let ipfsService: IpfsService; 2079 + let repoManager: RepoManager; 2080 + let syncStorage: SyncStorage; 2081 + const localDid = "did:plc:local"; 2082 + const remoteDid = "did:plc:remote1"; 2083 + 2084 + beforeEach(async () => { 2085 + tmpDir = mkdtempSync(join(tmpdir(), "peer-fallback-test-")); 2086 + const config = testConfig(join(tmpDir, "data"), [remoteDid]); 2087 + config.DID = localDid; 2088 + 2089 + db = new Database(join(tmpDir, "test.db")); 2090 + ipfsService = new IpfsService({ 2091 + blocksPath: join(tmpDir, "ipfs-blocks"), 2092 + datastorePath: join(tmpDir, "ipfs-datastore"), 2093 + networking: false, 2094 + }); 2095 + await ipfsService.start(); 2096 + 2097 + repoManager = new RepoManager(db, config); 2098 + repoManager.init(undefined, ipfsService, ipfsService); 2099 + 2100 + syncStorage = new SyncStorage(db); 2101 + syncStorage.initSchema(); 2102 + }); 2103 + 2104 + afterEach(async () => { 2105 + if (ipfsService.isRunning()) await ipfsService.stop(); 2106 + db.close(); 2107 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 2108 + }); 2109 + 2110 + it("source PDS fails → fetches from peer endpoint successfully", async () => { 2111 + // Create a real CAR from the local repo to serve as peer response 2112 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 2113 + $type: "app.bsky.feed.post", 2114 + text: "peer fallback test", 2115 + createdAt: new Date().toISOString(), 2116 + }); 2117 + const carBytes = await repoManager.getRepoCar(); 2118 + 2119 + // Set up sync state 2120 + syncStorage.upsertState({ 2121 + did: remoteDid, 2122 + pdsEndpoint: "https://source-pds.example.com", 2123 + }); 2124 + 2125 + // Register a peer endpoint 2126 + syncStorage.upsertPeerEndpoint( 2127 + remoteDid, 2128 + "did:plc:peer1", 2129 + "https://peer1.example.com", 2130 + "rev1", 2131 + ); 2132 + 2133 + // Mock RepoFetcher: source fails, peer succeeds 2134 + const mockDidResolver = { 2135 + resolve: async (did: string) => ({ 2136 + id: did, 2137 + service: [ 2138 + { 2139 + id: "#atproto_pds", 2140 + type: "AtprotoPersonalDataServer", 2141 + serviceEndpoint: "https://source-pds.example.com", 2142 + }, 2143 + ], 2144 + }), 2145 + }; 2146 + const { RepoFetcher: RF } = await import("./repo-fetcher.js"); 2147 + const fetcher = new RF(mockDidResolver as any); 2148 + const originalFetchRepo = fetcher.fetchRepo.bind(fetcher); 2149 + 2150 + let callCount = 0; 2151 + fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => { 2152 + callCount++; 2153 + if (endpoint === "https://source-pds.example.com") { 2154 + throw new Error("Source PDS unreachable"); 2155 + } 2156 + if (endpoint === "https://peer1.example.com") { 2157 + return carBytes; 2158 + } 2159 + return originalFetchRepo(endpoint, did, since); 2160 + }; 2161 + 2162 + // Access private method via prototype 2163 + const { ReplicationManager: RM } = await import("./replication-manager.js"); 2164 + const manager = new RM( 2165 + db, 2166 + testConfig(join(tmpDir, "data"), [remoteDid]), 2167 + repoManager, 2168 + ipfsService, 2169 + ipfsService, 2170 + mockDidResolver as any, 2171 + ); 2172 + 2173 + // Replace the internal repoFetcher with our mock 2174 + (manager as any).repoFetcher = fetcher; 2175 + (manager as any).syncStorage = syncStorage; 2176 + 2177 + // Should succeed via peer fallback 2178 + await manager.syncDid(remoteDid); 2179 + 2180 + // Source PDS was tried (at least once), and peer was used 2181 + expect(callCount).toBeGreaterThanOrEqual(2); 2182 + 2183 + // Verify sync state was updated 2184 + const state = syncStorage.getState(remoteDid); 2185 + expect(state).not.toBeNull(); 2186 + expect(state!.status).toBe("synced"); 2187 + }); 2188 + 2189 + it("source PDS fails + all peers fail → throws original error", async () => { 2190 + syncStorage.upsertState({ 2191 + did: remoteDid, 2192 + pdsEndpoint: "https://source-pds.example.com", 2193 + }); 2194 + 2195 + // Register a peer that will also fail 2196 + syncStorage.upsertPeerEndpoint( 2197 + remoteDid, 2198 + "did:plc:peer1", 2199 + "https://peer1.example.com", 2200 + null, 2201 + ); 2202 + 2203 + const mockDidResolver = { 2204 + resolve: async (did: string) => ({ 2205 + id: did, 2206 + service: [ 2207 + { 2208 + id: "#atproto_pds", 2209 + type: "AtprotoPersonalDataServer", 2210 + serviceEndpoint: "https://source-pds.example.com", 2211 + }, 2212 + ], 2213 + }), 2214 + }; 2215 + 2216 + const { RepoFetcher: RF } = await import("./repo-fetcher.js"); 2217 + const fetcher = new RF(mockDidResolver as any); 2218 + fetcher.fetchRepo = async () => { 2219 + throw new Error("Connection refused"); 2220 + }; 2221 + 2222 + const { ReplicationManager: RM } = await import("./replication-manager.js"); 2223 + const manager = new RM( 2224 + db, 2225 + testConfig(join(tmpDir, "data"), [remoteDid]), 2226 + repoManager, 2227 + ipfsService, 2228 + ipfsService, 2229 + mockDidResolver as any, 2230 + ); 2231 + (manager as any).repoFetcher = fetcher; 2232 + (manager as any).syncStorage = syncStorage; 2233 + 2234 + // Should throw the original error 2235 + await expect(manager.syncDid(remoteDid)).rejects.toThrow("Connection refused"); 2236 + }); 2237 + 2238 + it("source PDS fails without since → tries peers without since", async () => { 2239 + // Create a real CAR 2240 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 2241 + $type: "app.bsky.feed.post", 2242 + text: "no-since test", 2243 + createdAt: new Date().toISOString(), 2244 + }); 2245 + const carBytes = await repoManager.getRepoCar(); 2246 + 2247 + // Set up sync state WITHOUT a previous rev (no since) 2248 + syncStorage.upsertState({ 2249 + did: remoteDid, 2250 + pdsEndpoint: "https://source-pds.example.com", 2251 + }); 2252 + 2253 + syncStorage.upsertPeerEndpoint( 2254 + remoteDid, 2255 + "did:plc:peer1", 2256 + "https://peer1.example.com", 2257 + null, 2258 + ); 2259 + 2260 + const mockDidResolver = { 2261 + resolve: async (did: string) => ({ 2262 + id: did, 2263 + service: [ 2264 + { 2265 + id: "#atproto_pds", 2266 + type: "AtprotoPersonalDataServer", 2267 + serviceEndpoint: "https://source-pds.example.com", 2268 + }, 2269 + ], 2270 + }), 2271 + }; 2272 + 2273 + const { RepoFetcher: RF } = await import("./repo-fetcher.js"); 2274 + const fetcher = new RF(mockDidResolver as any); 2275 + 2276 + let peerSincePassed: string | undefined = "NOT_CALLED"; 2277 + fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => { 2278 + if (endpoint === "https://source-pds.example.com") { 2279 + throw new Error("Source PDS down"); 2280 + } 2281 + if (endpoint === "https://peer1.example.com") { 2282 + peerSincePassed = since; 2283 + return carBytes; 2284 + } 2285 + throw new Error("Unknown endpoint"); 2286 + }; 2287 + 2288 + const { ReplicationManager: RM } = await import("./replication-manager.js"); 2289 + const manager = new RM( 2290 + db, 2291 + testConfig(join(tmpDir, "data"), [remoteDid]), 2292 + repoManager, 2293 + ipfsService, 2294 + ipfsService, 2295 + mockDidResolver as any, 2296 + ); 2297 + (manager as any).repoFetcher = fetcher; 2298 + (manager as any).syncStorage = syncStorage; 2299 + 2300 + await manager.syncDid(remoteDid); 2301 + 2302 + // Peer should have been called without `since` (undefined) 2303 + expect(peerSincePassed).toBeUndefined(); 2304 + }); 2305 + });
+68
src/replication/sync-storage.ts
··· 63 63 ); 64 64 `); 65 65 66 + // Peer endpoints table: tracks which peers have which DIDs for P2P fallback fetch. 67 + this.db.exec(` 68 + CREATE TABLE IF NOT EXISTS peer_endpoints ( 69 + target_did TEXT NOT NULL, 70 + peer_did TEXT NOT NULL, 71 + pds_endpoint TEXT NOT NULL, 72 + last_sync_rev TEXT, 73 + discovered_at TEXT NOT NULL DEFAULT (datetime('now')), 74 + PRIMARY KEY (target_did, peer_did) 75 + ); 76 + `); 77 + 66 78 // Migration: add root_cid column if missing (for existing databases) 67 79 const columns = this.db 68 80 .prepare("PRAGMA table_info(replication_state)") ··· 386 398 ) 387 399 .get(did) as { count: number }; 388 400 return row.count; 401 + } 402 + 403 + // ============================================ 404 + // Peer endpoint tracking (for P2P fallback fetch) 405 + // ============================================ 406 + 407 + /** 408 + * Upsert a peer endpoint entry for a target DID. 409 + */ 410 + upsertPeerEndpoint( 411 + targetDid: string, 412 + peerDid: string, 413 + pdsEndpoint: string, 414 + lastSyncRev: string | null, 415 + ): void { 416 + this.db 417 + .prepare( 418 + `INSERT INTO peer_endpoints (target_did, peer_did, pds_endpoint, last_sync_rev) 419 + VALUES (?, ?, ?, ?) 420 + ON CONFLICT(target_did, peer_did) DO UPDATE SET 421 + pds_endpoint = excluded.pds_endpoint, 422 + last_sync_rev = excluded.last_sync_rev, 423 + discovered_at = datetime('now')`, 424 + ) 425 + .run(targetDid, peerDid, pdsEndpoint, lastSyncRev); 426 + } 427 + 428 + /** 429 + * Get all known peer endpoints for a target DID. 430 + */ 431 + getPeerEndpoints( 432 + targetDid: string, 433 + ): Array<{ peerDid: string; pdsEndpoint: string; lastSyncRev: string | null }> { 434 + const rows = this.db 435 + .prepare( 436 + "SELECT peer_did, pds_endpoint, last_sync_rev FROM peer_endpoints WHERE target_did = ?", 437 + ) 438 + .all(targetDid) as Array<{ 439 + peer_did: string; 440 + pds_endpoint: string; 441 + last_sync_rev: string | null; 442 + }>; 443 + return rows.map((r) => ({ 444 + peerDid: r.peer_did, 445 + pdsEndpoint: r.pds_endpoint, 446 + lastSyncRev: r.last_sync_rev, 447 + })); 448 + } 449 + 450 + /** 451 + * Clear all peer endpoint entries for a target DID. 452 + */ 453 + clearPeerEndpoints(targetDid: string): void { 454 + this.db 455 + .prepare("DELETE FROM peer_endpoints WHERE target_did = ?") 456 + .run(targetDid); 389 457 } 390 458 391 459 private rowToState(row: Record<string, unknown>): SyncState {
+83 -26
src/xrpc/sync.ts
··· 6 6 import type { ReplicatedRepoReader } from "../replication/replicated-repo-reader.js"; 7 7 import type { BlockStore } from "../ipfs.js"; 8 8 import type { SyncStorage } from "../replication/sync-storage.js"; 9 + import { BlockMap, blocksToCarFile } from "@atproto/repo"; 10 + import { CID } from "@atproto/lex-data"; 9 11 10 12 export async function getRepo( 11 13 c: Context<AppEnv>, 12 14 repoManager: RepoManager, 15 + blockStore?: BlockStore, 16 + syncStorage?: SyncStorage, 13 17 ): Promise<Response> { 14 18 const did = c.req.query("did"); 15 19 ··· 27 31 ); 28 32 } 29 33 30 - if (did !== c.env.DID) { 31 - return c.json( 32 - { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 33 - 404, 34 - ); 34 + // Local DID: serve from RepoManager 35 + if (did === c.env.DID) { 36 + const carBytes = await repoManager.getRepoCar(); 37 + return new Response(carBytes, { 38 + status: 200, 39 + headers: { 40 + "Content-Type": "application/vnd.ipld.car", 41 + "Content-Length": carBytes.length.toString(), 42 + }, 43 + }); 35 44 } 36 45 37 - const carBytes = await repoManager.getRepoCar(); 46 + // Replicated DID: serve from BlockStore 47 + if (blockStore && syncStorage) { 48 + const state = syncStorage.getState(did); 49 + if (state?.rootCid) { 50 + const blockCids = syncStorage.getBlockCids(did); 51 + const blocks = new BlockMap(); 52 + for (const cidStr of blockCids) { 53 + const bytes = await blockStore.getBlock(cidStr); 54 + if (bytes) { 55 + blocks.set(CID.parse(cidStr), bytes); 56 + } 57 + } 58 + const root = CID.parse(state.rootCid); 59 + const carBytes = await blocksToCarFile(root, blocks); 60 + return new Response(carBytes, { 61 + status: 200, 62 + headers: { 63 + "Content-Type": "application/vnd.ipld.car", 64 + "Content-Length": carBytes.length.toString(), 65 + }, 66 + }); 67 + } 68 + } 38 69 39 - return new Response(carBytes, { 40 - status: 200, 41 - headers: { 42 - "Content-Type": "application/vnd.ipld.car", 43 - "Content-Length": carBytes.length.toString(), 44 - }, 45 - }); 70 + return c.json( 71 + { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 72 + 404, 73 + ); 46 74 } 47 75 48 76 export async function getRepoStatus( ··· 178 206 export async function getBlocks( 179 207 c: Context<AppEnv>, 180 208 repoManager: RepoManager, 209 + blockStore?: BlockStore, 210 + syncStorage?: SyncStorage, 181 211 ): Promise<Response> { 182 212 const did = c.req.query("did"); 183 213 const cidsParam = c.req.queries("cids"); ··· 203 233 ); 204 234 } 205 235 206 - if (did !== c.env.DID) { 207 - return c.json( 208 - { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 209 - 404, 210 - ); 236 + // Local DID: serve from RepoManager 237 + if (did === c.env.DID) { 238 + const carBytes = await repoManager.getBlocks(cidsParam); 239 + return new Response(carBytes, { 240 + status: 200, 241 + headers: { 242 + "Content-Type": "application/vnd.ipld.car", 243 + "Content-Length": carBytes.length.toString(), 244 + }, 245 + }); 211 246 } 212 247 213 - const carBytes = await repoManager.getBlocks(cidsParam); 248 + // Replicated DID: serve requested blocks from BlockStore 249 + if (blockStore && syncStorage) { 250 + const state = syncStorage.getState(did); 251 + if (state?.rootCid) { 252 + // Security: only serve blocks tracked for this DID 253 + const trackedCids = new Set(syncStorage.getBlockCids(did)); 254 + const blocks = new BlockMap(); 255 + for (const cidStr of cidsParam) { 256 + if (trackedCids.has(cidStr)) { 257 + const bytes = await blockStore.getBlock(cidStr); 258 + if (bytes) { 259 + blocks.set(CID.parse(cidStr), bytes); 260 + } 261 + } 262 + } 263 + const root = CID.parse(state.rootCid); 264 + const carBytes = await blocksToCarFile(root, blocks); 265 + return new Response(carBytes, { 266 + status: 200, 267 + headers: { 268 + "Content-Type": "application/vnd.ipld.car", 269 + "Content-Length": carBytes.length.toString(), 270 + }, 271 + }); 272 + } 273 + } 214 274 215 - return new Response(carBytes, { 216 - status: 200, 217 - headers: { 218 - "Content-Type": "application/vnd.ipld.car", 219 - "Content-Length": carBytes.length.toString(), 220 - }, 221 - }); 275 + return c.json( 276 + { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 277 + 404, 278 + ); 222 279 } 223 280 224 281 export async function getBlob(