atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add P2P record replication base layer

Implement the core replication loop: announce, discover, replicate. Nodes
publish AT Protocol records declaring their IPFS PeerID (org.p2pds.peer)
and which DIDs they replicate (org.p2pds.manifest). Other nodes discover
this info, fetch repos via CAR export, store blocks in IPFS, and verify
availability.

New modules: types, sync-storage, repo-fetcher, peer-discovery,
verification, and replication-manager orchestrator. Adds REPLICATE_DIDS
config, getMultiaddrs() to IpfsService, replication status/syncNow XRPC
endpoints, and 27 tests covering all components plus integration CAR
roundtrip.

+1464 -2
+2
src/config.ts
··· 15 15 PORT: number; 16 16 IPFS_ENABLED: boolean; 17 17 IPFS_NETWORKING: boolean; 18 + REPLICATE_DIDS: string[]; 18 19 } 19 20 20 21 const REQUIRED_KEYS = [ ··· 96 97 PORT: parseInt(process.env.PORT ?? "3000", 10), 97 98 IPFS_ENABLED: process.env.IPFS_ENABLED !== "false", 98 99 IPFS_NETWORKING: process.env.IPFS_NETWORKING !== "false", 100 + REPLICATE_DIDS: (process.env.REPLICATE_DIDS ?? "").split(",").map(s => s.trim()).filter(Boolean), 99 101 }; 100 102 }
+23
src/index.ts
··· 6 6 import type { Config } from "./config.js"; 7 7 import type { IpfsService } from "./ipfs.js"; 8 8 import type { BlobStore } from "./blobs.js"; 9 + import type { ReplicationManager } from "./replication/replication-manager.js"; 9 10 import * as sync from "./xrpc/sync.js"; 10 11 import * as repo from "./xrpc/repo.js"; 11 12 import * as server from "./xrpc/server.js"; ··· 22 23 firehose: Firehose, 23 24 ipfsService?: IpfsService, 24 25 blobStore?: BlobStore, 26 + replicationManager?: ReplicationManager, 25 27 ) { 26 28 const app = new Hono<{ Bindings: Config }>(); 27 29 ··· 374 376 ); 375 377 }, 376 378 ); 379 + 380 + // ============================================ 381 + // Replication status 382 + // ============================================ 383 + app.get("/xrpc/org.p2pds.replication.getStatus", requireAuth, (c) => { 384 + if (!replicationManager) { 385 + return c.json({ error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 400); 386 + } 387 + return c.json({ states: replicationManager.getSyncStates() }); 388 + }); 389 + 390 + app.post("/xrpc/org.p2pds.replication.syncNow", requireAuth, async (c) => { 391 + if (!replicationManager) { 392 + return c.json({ error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 400); 393 + } 394 + // Trigger sync in background, return immediately 395 + replicationManager.syncAll().catch((err) => { 396 + console.error("Manual sync error:", err); 397 + }); 398 + return c.json({ message: "Sync triggered" }); 399 + }); 377 400 378 401 return app; 379 402 }
+2 -1
src/ipfs.test.ts
··· 40 40 PORT: 3000, 41 41 IPFS_ENABLED: true, 42 42 IPFS_NETWORKING: false, 43 + REPLICATE_DIDS: [], 43 44 }; 44 45 } 45 46 ··· 299 300 ); 300 301 expect(res.status).toBe(404); 301 302 302 - const json = await res.json(); 303 + const json = (await res.json()) as { error: string }; 303 304 expect(json.error).toBe("BlockNotFound"); 304 305 }); 305 306
+5
src/ipfs.ts
··· 147 147 // Future: publish CBOR message { did, commit, rev, time, peer } via gossipsub 148 148 } 149 149 150 + getMultiaddrs(): string[] { 151 + if (!this.helia) return []; 152 + return this.helia.libp2p.getMultiaddrs().map(ma => ma.toString()); 153 + } 154 + 150 155 getPeerId(): string | null { 151 156 if (!this.helia) return null; 152 157 return this.helia.libp2p.peerId.toString();
+78
src/replication/peer-discovery.ts
··· 1 + /** 2 + * Discover peer IPFS identities via AT Protocol records. 3 + */ 4 + 5 + import type { RepoFetcher } from "./repo-fetcher.js"; 6 + import { PEER_NSID, MANIFEST_NSID, type ManifestRecord } from "./types.js"; 7 + 8 + export interface PeerInfo { 9 + pdsEndpoint: string; 10 + peerId: string | null; 11 + multiaddrs: string[]; 12 + } 13 + 14 + export class PeerDiscovery { 15 + constructor(private repoFetcher: RepoFetcher) {} 16 + 17 + /** 18 + * Discover a peer's IPFS identity via their AT Protocol records. 19 + * 1. Resolve DID -> PDS endpoint 20 + * 2. Fetch org.p2pds.peer/self record 21 + * 3. Return PeerID + multiaddrs 22 + */ 23 + async discoverPeer(did: string): Promise<PeerInfo | null> { 24 + const pdsEndpoint = await this.repoFetcher.resolvePds(did); 25 + if (!pdsEndpoint) return null; 26 + 27 + const record = await this.repoFetcher.fetchRecord( 28 + pdsEndpoint, 29 + did, 30 + PEER_NSID, 31 + "self", 32 + ); 33 + 34 + if (!record || typeof record !== "object") { 35 + return { 36 + pdsEndpoint, 37 + peerId: null, 38 + multiaddrs: [], 39 + }; 40 + } 41 + 42 + const peerRecord = record as Record<string, unknown>; 43 + return { 44 + pdsEndpoint, 45 + peerId: 46 + typeof peerRecord.peerId === "string" 47 + ? peerRecord.peerId 48 + : null, 49 + multiaddrs: Array.isArray(peerRecord.multiaddrs) 50 + ? (peerRecord.multiaddrs as string[]) 51 + : [], 52 + }; 53 + } 54 + 55 + /** 56 + * Discover what DIDs a peer claims to replicate. 57 + * Fetches all org.p2pds.manifest records from their PDS. 58 + */ 59 + async discoverManifests( 60 + did: string, 61 + pdsEndpoint: string, 62 + ): Promise<ManifestRecord[]> { 63 + const records = await this.repoFetcher.listRecords( 64 + pdsEndpoint, 65 + did, 66 + MANIFEST_NSID, 67 + ); 68 + 69 + return records 70 + .map((r) => r.value) 71 + .filter( 72 + (v): v is ManifestRecord => 73 + typeof v === "object" && 74 + v !== null && 75 + (v as Record<string, unknown>).$type === MANIFEST_NSID, 76 + ); 77 + } 78 + }
+290
src/replication/replication-manager.ts
··· 1 + /** 2 + * Main replication orchestrator. 3 + * Publishes peer identity + manifest records, syncs remote repos to IPFS. 4 + */ 5 + 6 + import type Database from "better-sqlite3"; 7 + import type { Config } from "../config.js"; 8 + import type { RepoManager } from "../repo-manager.js"; 9 + import type { IpfsService } from "../ipfs.js"; 10 + import type { DidResolver } from "../did-resolver.js"; 11 + import { readCarWithRoot } from "@atproto/repo"; 12 + 13 + import { 14 + PEER_NSID, 15 + MANIFEST_NSID, 16 + didToRkey, 17 + type PeerIdentityRecord, 18 + type ManifestRecord, 19 + type SyncState, 20 + } from "./types.js"; 21 + import { SyncStorage } from "./sync-storage.js"; 22 + import { RepoFetcher } from "./repo-fetcher.js"; 23 + import { PeerDiscovery } from "./peer-discovery.js"; 24 + import { BlockVerifier } from "./verification.js"; 25 + 26 + /** How old cached peer info can be before re-fetching (1 hour). */ 27 + const PEER_INFO_TTL_MS = 60 * 60 * 1000; 28 + 29 + export class ReplicationManager { 30 + private syncStorage: SyncStorage; 31 + private repoFetcher: RepoFetcher; 32 + private peerDiscovery: PeerDiscovery; 33 + private verifier: BlockVerifier; 34 + private syncTimer: ReturnType<typeof setInterval> | null = null; 35 + private stopped = false; 36 + 37 + constructor( 38 + db: Database.Database, 39 + private config: Config, 40 + private repoManager: RepoManager, 41 + private ipfsService: IpfsService, 42 + private didResolver: DidResolver, 43 + ) { 44 + this.syncStorage = new SyncStorage(db); 45 + this.repoFetcher = new RepoFetcher(didResolver); 46 + this.peerDiscovery = new PeerDiscovery(this.repoFetcher); 47 + this.verifier = new BlockVerifier(ipfsService); 48 + } 49 + 50 + /** 51 + * Initialize replication: create tables, publish identity, sync manifests. 52 + */ 53 + async init(): Promise<void> { 54 + this.syncStorage.initSchema(); 55 + await this.publishPeerIdentity(); 56 + await this.syncManifests(); 57 + } 58 + 59 + /** 60 + * Publish (or update) the org.p2pds.peer/self record with our IPFS PeerID. 61 + * No-op if networking is disabled (getPeerId returns null). 62 + */ 63 + async publishPeerIdentity(): Promise<void> { 64 + const peerId = this.ipfsService.getPeerId(); 65 + if (!peerId) return; // networking disabled 66 + 67 + const record: PeerIdentityRecord = { 68 + $type: PEER_NSID, 69 + peerId, 70 + multiaddrs: this.ipfsService.getMultiaddrs(), 71 + createdAt: new Date().toISOString(), 72 + }; 73 + 74 + await this.repoManager.putRecord(PEER_NSID, "self", record); 75 + } 76 + 77 + /** 78 + * Ensure a manifest record exists for each configured DID. 79 + * Creates new manifests or updates existing ones. 80 + */ 81 + async syncManifests(): Promise<void> { 82 + for (const did of this.config.REPLICATE_DIDS) { 83 + const rkey = didToRkey(did); 84 + 85 + // Check if manifest already exists 86 + const existing = await this.repoManager.getRecord( 87 + MANIFEST_NSID, 88 + rkey, 89 + ); 90 + 91 + if (!existing) { 92 + const manifest: ManifestRecord = { 93 + $type: MANIFEST_NSID, 94 + subject: did, 95 + status: "active", 96 + lastSyncRev: null, 97 + lastSyncAt: null, 98 + createdAt: new Date().toISOString(), 99 + }; 100 + 101 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 102 + } 103 + 104 + // Ensure sync state exists 105 + const pdsEndpoint = await this.repoFetcher.resolvePds(did); 106 + if (pdsEndpoint) { 107 + this.syncStorage.upsertState({ 108 + did, 109 + pdsEndpoint, 110 + }); 111 + } 112 + } 113 + } 114 + 115 + /** 116 + * Sync all configured DIDs. 117 + */ 118 + async syncAll(): Promise<void> { 119 + for (const did of this.config.REPLICATE_DIDS) { 120 + if (this.stopped) break; 121 + try { 122 + await this.syncDid(did); 123 + } catch (err) { 124 + const message = 125 + err instanceof Error ? err.message : String(err); 126 + this.syncStorage.updateStatus(did, "error", message); 127 + } 128 + } 129 + } 130 + 131 + /** 132 + * Sync a single DID: fetch repo, store blocks in IPFS, verify, update state. 133 + */ 134 + async syncDid(did: string): Promise<void> { 135 + this.syncStorage.updateStatus(did, "syncing"); 136 + 137 + // 1. Resolve PDS endpoint 138 + let state = this.syncStorage.getState(did); 139 + let pdsEndpoint = await this.repoFetcher.resolvePds(did); 140 + 141 + if (!pdsEndpoint) { 142 + this.syncStorage.updateStatus(did, "error", "Could not resolve PDS endpoint"); 143 + return; 144 + } 145 + 146 + // Update PDS endpoint if it changed 147 + this.syncStorage.upsertState({ did, pdsEndpoint }); 148 + state = this.syncStorage.getState(did); 149 + 150 + // 2. Optionally refresh peer info 151 + if (state && this.shouldRefreshPeerInfo(state)) { 152 + try { 153 + const peerInfo = await this.peerDiscovery.discoverPeer(did); 154 + if (peerInfo) { 155 + this.syncStorage.updatePeerInfo(did, peerInfo.peerId); 156 + } 157 + } catch { 158 + // Non-fatal: peer discovery is optional 159 + } 160 + } 161 + 162 + // 3. Fetch repo (with incremental sync if we have a previous rev) 163 + const since = state?.lastSyncRev ?? undefined; 164 + let carBytes: Uint8Array; 165 + try { 166 + carBytes = await this.repoFetcher.fetchRepo( 167 + pdsEndpoint, 168 + did, 169 + since, 170 + ); 171 + } catch (err) { 172 + // On failure, clear cached peer info and try without `since` 173 + this.syncStorage.clearPeerInfo(did); 174 + if (since) { 175 + // Retry full sync 176 + carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did); 177 + } else { 178 + throw err; 179 + } 180 + } 181 + 182 + // 4. Parse CAR and store blocks 183 + const { root, blocks } = await readCarWithRoot(carBytes); 184 + 185 + await this.ipfsService.putBlocks(blocks); 186 + 187 + // 5. Collect CID strings for DHT announcement + verification 188 + const cidStrs: string[] = []; 189 + const internalMap = ( 190 + blocks as unknown as { map: Map<string, Uint8Array> } 191 + ).map; 192 + if (internalMap) { 193 + for (const cidStr of internalMap.keys()) { 194 + cidStrs.push(cidStr); 195 + } 196 + } 197 + 198 + // 6. Announce to DHT (fire-and-forget) 199 + this.ipfsService.provideBlocks(cidStrs).catch(() => {}); 200 + 201 + // 7. Verify block availability 202 + const verification = 203 + await this.verifier.verifyBlockAvailability(cidStrs); 204 + if (verification.missing.length > 0) { 205 + console.warn( 206 + `Verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, 207 + ); 208 + } 209 + this.syncStorage.updateVerifiedAt(did); 210 + 211 + // 8. Determine rev from the root commit 212 + // The root CID string serves as a rev identifier when we can't extract the actual rev 213 + const rev = root.toString(); 214 + 215 + // 9. Update sync state 216 + this.syncStorage.updateSyncProgress(did, rev); 217 + 218 + // 10. Update manifest record 219 + const rkey = didToRkey(did); 220 + const existingManifest = await this.repoManager.getRecord( 221 + MANIFEST_NSID, 222 + rkey, 223 + ); 224 + if (existingManifest) { 225 + const manifest: ManifestRecord = { 226 + $type: MANIFEST_NSID, 227 + subject: did, 228 + status: "active", 229 + lastSyncRev: rev, 230 + lastSyncAt: new Date().toISOString(), 231 + createdAt: 232 + (existingManifest.record as Record<string, unknown>) 233 + ?.createdAt as string ?? new Date().toISOString(), 234 + }; 235 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 236 + } 237 + } 238 + 239 + /** 240 + * Start periodic sync at the given interval. 241 + */ 242 + startPeriodicSync(intervalMs: number = 5 * 60 * 1000): void { 243 + if (this.syncTimer) return; 244 + this.stopped = false; 245 + 246 + // Run first sync after a short delay to let startup complete 247 + setTimeout(() => { 248 + if (!this.stopped) { 249 + this.syncAll().catch((err) => { 250 + console.error("Periodic sync error:", err); 251 + }); 252 + } 253 + }, 5000); 254 + 255 + this.syncTimer = setInterval(() => { 256 + if (!this.stopped) { 257 + this.syncAll().catch((err) => { 258 + console.error("Periodic sync error:", err); 259 + }); 260 + } 261 + }, intervalMs); 262 + } 263 + 264 + /** 265 + * Stop periodic sync. 266 + */ 267 + stop(): void { 268 + this.stopped = true; 269 + if (this.syncTimer) { 270 + clearInterval(this.syncTimer); 271 + this.syncTimer = null; 272 + } 273 + } 274 + 275 + /** 276 + * Get sync states for all tracked DIDs. 277 + */ 278 + getSyncStates(): SyncState[] { 279 + return this.syncStorage.getAllStates(); 280 + } 281 + 282 + /** 283 + * Check if peer info should be refreshed based on TTL. 284 + */ 285 + private shouldRefreshPeerInfo(state: SyncState): boolean { 286 + if (!state.peerInfoFetchedAt) return true; 287 + const fetchedAt = new Date(state.peerInfoFetchedAt).getTime(); 288 + return Date.now() - fetchedAt > PEER_INFO_TTL_MS; 289 + } 290 + }
+636
src/replication/replication.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 2 + import { mkdtempSync, rmSync } from "node:fs"; 3 + import { tmpdir } from "node:os"; 4 + import { join } from "node:path"; 5 + import Database from "better-sqlite3"; 6 + import { IpfsService } from "../ipfs.js"; 7 + import { RepoManager } from "../repo-manager.js"; 8 + import type { Config } from "../config.js"; 9 + import { BlockMap, readCarWithRoot } from "@atproto/repo"; 10 + import { CID } from "@atproto/lex-data"; 11 + import { 12 + create as createCid, 13 + CODEC_RAW, 14 + toString as cidToString, 15 + } from "@atcute/cid"; 16 + 17 + import { SyncStorage } from "./sync-storage.js"; 18 + import { 19 + didToRkey, 20 + rkeyToDid, 21 + PEER_NSID, 22 + MANIFEST_NSID, 23 + } from "./types.js"; 24 + import { BlockVerifier } from "./verification.js"; 25 + import { RepoFetcher, extractPdsEndpoint } from "./repo-fetcher.js"; 26 + import { PeerDiscovery } from "./peer-discovery.js"; 27 + 28 + /** Create a CID string from raw bytes using SHA-256. */ 29 + async function makeCidStr(bytes: Uint8Array): Promise<string> { 30 + const cid = await createCid(CODEC_RAW, bytes); 31 + return cidToString(cid); 32 + } 33 + 34 + function testConfig(dataDir: string, replicateDids: string[] = []): Config { 35 + return { 36 + DID: "did:plc:test123", 37 + HANDLE: "test.example.com", 38 + PDS_HOSTNAME: "test.example.com", 39 + AUTH_TOKEN: "test-auth-token", 40 + SIGNING_KEY: 41 + "0000000000000000000000000000000000000000000000000000000000000001", 42 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 43 + JWT_SECRET: "test-jwt-secret", 44 + PASSWORD_HASH: "$2a$10$test", 45 + DATA_DIR: dataDir, 46 + PORT: 3000, 47 + IPFS_ENABLED: true, 48 + IPFS_NETWORKING: false, 49 + REPLICATE_DIDS: replicateDids, 50 + }; 51 + } 52 + 53 + // ============================================ 54 + // didToRkey / rkeyToDid 55 + // ============================================ 56 + 57 + describe("didToRkey", () => { 58 + it("replaces colons with hyphens", () => { 59 + expect(didToRkey("did:plc:abc123")).toBe("did-plc-abc123"); 60 + }); 61 + 62 + it("handles did:web", () => { 63 + expect(didToRkey("did:web:example.com")).toBe("did-web-example.com"); 64 + }); 65 + 66 + it("roundtrips with rkeyToDid for simple DIDs", () => { 67 + const did = "did:plc:abc123"; 68 + const rkey = didToRkey(did); 69 + // rkeyToDid replaces ALL hyphens, so roundtrip only works for DIDs without hyphens 70 + expect(rkeyToDid(rkey)).toBe(did); 71 + }); 72 + }); 73 + 74 + // ============================================ 75 + // SyncStorage 76 + // ============================================ 77 + 78 + describe("SyncStorage", () => { 79 + let tmpDir: string; 80 + let db: InstanceType<typeof Database>; 81 + let storage: SyncStorage; 82 + 83 + beforeEach(() => { 84 + tmpDir = mkdtempSync(join(tmpdir(), "sync-storage-test-")); 85 + db = new Database(join(tmpDir, "test.db")); 86 + storage = new SyncStorage(db); 87 + storage.initSchema(); 88 + }); 89 + 90 + afterEach(() => { 91 + db.close(); 92 + rmSync(tmpDir, { recursive: true, force: true }); 93 + }); 94 + 95 + it("creates table and retrieves empty states", () => { 96 + const states = storage.getAllStates(); 97 + expect(states).toEqual([]); 98 + }); 99 + 100 + it("upserts and retrieves state", () => { 101 + storage.upsertState({ 102 + did: "did:plc:test1", 103 + pdsEndpoint: "https://pds.example.com", 104 + }); 105 + 106 + const state = storage.getState("did:plc:test1"); 107 + expect(state).not.toBeNull(); 108 + expect(state!.did).toBe("did:plc:test1"); 109 + expect(state!.pdsEndpoint).toBe("https://pds.example.com"); 110 + expect(state!.status).toBe("pending"); 111 + expect(state!.peerId).toBeNull(); 112 + }); 113 + 114 + it("updates status", () => { 115 + storage.upsertState({ 116 + did: "did:plc:test1", 117 + pdsEndpoint: "https://pds.example.com", 118 + }); 119 + 120 + storage.updateStatus("did:plc:test1", "syncing"); 121 + expect(storage.getState("did:plc:test1")!.status).toBe("syncing"); 122 + }); 123 + 124 + it("updates status with error message", () => { 125 + storage.upsertState({ 126 + did: "did:plc:test1", 127 + pdsEndpoint: "https://pds.example.com", 128 + }); 129 + 130 + storage.updateStatus("did:plc:test1", "error", "Connection refused"); 131 + const state = storage.getState("did:plc:test1")!; 132 + expect(state.status).toBe("error"); 133 + expect(state.errorMessage).toBe("Connection refused"); 134 + }); 135 + 136 + it("updates sync progress", () => { 137 + storage.upsertState({ 138 + did: "did:plc:test1", 139 + pdsEndpoint: "https://pds.example.com", 140 + }); 141 + 142 + storage.updateSyncProgress("did:plc:test1", "rev123"); 143 + const state = storage.getState("did:plc:test1")!; 144 + expect(state.lastSyncRev).toBe("rev123"); 145 + expect(state.lastSyncAt).not.toBeNull(); 146 + expect(state.status).toBe("synced"); 147 + }); 148 + 149 + it("updates and clears peer info", () => { 150 + storage.upsertState({ 151 + did: "did:plc:test1", 152 + pdsEndpoint: "https://pds.example.com", 153 + }); 154 + 155 + storage.updatePeerInfo("did:plc:test1", "12D3KooWTest"); 156 + let state = storage.getState("did:plc:test1")!; 157 + expect(state.peerId).toBe("12D3KooWTest"); 158 + expect(state.peerInfoFetchedAt).not.toBeNull(); 159 + 160 + storage.clearPeerInfo("did:plc:test1"); 161 + state = storage.getState("did:plc:test1")!; 162 + expect(state.peerId).toBeNull(); 163 + expect(state.peerInfoFetchedAt).toBeNull(); 164 + }); 165 + 166 + it("getAllStates returns all entries sorted by DID", () => { 167 + storage.upsertState({ 168 + did: "did:plc:bbb", 169 + pdsEndpoint: "https://b.example.com", 170 + }); 171 + storage.upsertState({ 172 + did: "did:plc:aaa", 173 + pdsEndpoint: "https://a.example.com", 174 + }); 175 + 176 + const states = storage.getAllStates(); 177 + expect(states).toHaveLength(2); 178 + expect(states[0]!.did).toBe("did:plc:aaa"); 179 + expect(states[1]!.did).toBe("did:plc:bbb"); 180 + }); 181 + }); 182 + 183 + // ============================================ 184 + // Record Publishing (peer identity + manifests) 185 + // ============================================ 186 + 187 + describe("Record publishing", () => { 188 + let tmpDir: string; 189 + let db: InstanceType<typeof Database>; 190 + let ipfsService: IpfsService; 191 + let repoManager: RepoManager; 192 + 193 + beforeEach(async () => { 194 + tmpDir = mkdtempSync(join(tmpdir(), "repl-publish-test-")); 195 + const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); 196 + 197 + db = new Database(join(tmpDir, "test.db")); 198 + ipfsService = new IpfsService({ 199 + blocksPath: join(tmpDir, "ipfs-blocks"), 200 + datastorePath: join(tmpDir, "ipfs-datastore"), 201 + networking: false, 202 + }); 203 + await ipfsService.start(); 204 + 205 + repoManager = new RepoManager(db, config); 206 + repoManager.init(undefined, ipfsService); 207 + }); 208 + 209 + afterEach(async () => { 210 + if (ipfsService.isRunning()) { 211 + await ipfsService.stop(); 212 + } 213 + db.close(); 214 + rmSync(tmpDir, { recursive: true, force: true }); 215 + }); 216 + 217 + it("publishPeerIdentity is no-op when networking is off", async () => { 218 + // getPeerId() returns null when networking=false 219 + expect(ipfsService.getPeerId()).toBeNull(); 220 + 221 + // Manually call what ReplicationManager.publishPeerIdentity does 222 + const peerId = ipfsService.getPeerId(); 223 + if (peerId) { 224 + await repoManager.putRecord(PEER_NSID, "self", { 225 + $type: PEER_NSID, 226 + peerId, 227 + multiaddrs: [], 228 + createdAt: new Date().toISOString(), 229 + }); 230 + } 231 + 232 + // No record should have been created 233 + const record = await repoManager.getRecord(PEER_NSID, "self"); 234 + expect(record).toBeNull(); 235 + }); 236 + 237 + it("syncManifests creates manifest records for each configured DID", async () => { 238 + const dids = ["did:plc:remote1", "did:plc:remote2"]; 239 + for (const did of dids) { 240 + const rkey = didToRkey(did); 241 + await repoManager.putRecord(MANIFEST_NSID, rkey, { 242 + $type: MANIFEST_NSID, 243 + subject: did, 244 + status: "active", 245 + lastSyncRev: null, 246 + lastSyncAt: null, 247 + createdAt: new Date().toISOString(), 248 + }); 249 + } 250 + 251 + // Both should be readable 252 + for (const did of dids) { 253 + const rkey = didToRkey(did); 254 + const result = await repoManager.getRecord(MANIFEST_NSID, rkey); 255 + expect(result).not.toBeNull(); 256 + expect((result!.record as Record<string, unknown>).subject).toBe(did); 257 + expect((result!.record as Record<string, unknown>).status).toBe("active"); 258 + } 259 + }); 260 + 261 + it("syncManifests is idempotent", async () => { 262 + const did = "did:plc:remote1"; 263 + const rkey = didToRkey(did); 264 + const manifest = { 265 + $type: MANIFEST_NSID, 266 + subject: did, 267 + status: "active", 268 + lastSyncRev: null, 269 + lastSyncAt: null, 270 + createdAt: new Date().toISOString(), 271 + }; 272 + 273 + // Write twice 274 + await repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 275 + await repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 276 + 277 + // Should still be readable, no error 278 + const result = await repoManager.getRecord(MANIFEST_NSID, rkey); 279 + expect(result).not.toBeNull(); 280 + }); 281 + }); 282 + 283 + // ============================================ 284 + // RepoFetcher 285 + // ============================================ 286 + 287 + describe("RepoFetcher", () => { 288 + it("resolvePds with mock DidResolver returning test DID document", async () => { 289 + const mockDidResolver = { 290 + resolve: async (did: string) => { 291 + if (did === "did:plc:test1") { 292 + return { 293 + id: "did:plc:test1", 294 + service: [ 295 + { 296 + id: "#atproto_pds", 297 + type: "AtprotoPersonalDataServer", 298 + serviceEndpoint: "https://pds.example.com", 299 + }, 300 + ], 301 + }; 302 + } 303 + return null; 304 + }, 305 + }; 306 + 307 + const fetcher = new RepoFetcher(mockDidResolver as any); 308 + const pds = await fetcher.resolvePds("did:plc:test1"); 309 + expect(pds).toBe("https://pds.example.com"); 310 + }); 311 + 312 + it("returns null for unresolvable DID", async () => { 313 + const mockDidResolver = { 314 + resolve: async () => null, 315 + }; 316 + 317 + const fetcher = new RepoFetcher(mockDidResolver as any); 318 + const pds = await fetcher.resolvePds("did:plc:unknown"); 319 + expect(pds).toBeNull(); 320 + }); 321 + }); 322 + 323 + describe("extractPdsEndpoint", () => { 324 + it("extracts service endpoint from DID document", () => { 325 + const doc = { 326 + id: "did:plc:test1", 327 + service: [ 328 + { 329 + id: "#atproto_pds", 330 + type: "AtprotoPersonalDataServer", 331 + serviceEndpoint: "https://pds.example.com", 332 + }, 333 + ], 334 + }; 335 + expect(extractPdsEndpoint(doc as any)).toBe("https://pds.example.com"); 336 + }); 337 + 338 + it("returns null when no service array", () => { 339 + const doc = { id: "did:plc:test1" }; 340 + expect(extractPdsEndpoint(doc as any)).toBeNull(); 341 + }); 342 + 343 + it("returns null when no matching service", () => { 344 + const doc = { 345 + id: "did:plc:test1", 346 + service: [ 347 + { 348 + id: "#other", 349 + type: "Other", 350 + serviceEndpoint: "https://other.example.com", 351 + }, 352 + ], 353 + }; 354 + expect(extractPdsEndpoint(doc as any)).toBeNull(); 355 + }); 356 + }); 357 + 358 + // ============================================ 359 + // PeerDiscovery 360 + // ============================================ 361 + 362 + describe("PeerDiscovery", () => { 363 + it("returns null for unresolvable DID", async () => { 364 + const mockFetcher = { 365 + resolvePds: async () => null, 366 + fetchRecord: async () => null, 367 + listRecords: async () => [], 368 + }; 369 + 370 + const discovery = new PeerDiscovery(mockFetcher as any); 371 + const result = await discovery.discoverPeer("did:plc:unknown"); 372 + expect(result).toBeNull(); 373 + }); 374 + 375 + it("returns pdsEndpoint with null peerId when no peer record", async () => { 376 + const mockFetcher = { 377 + resolvePds: async () => "https://pds.example.com", 378 + fetchRecord: async () => null, 379 + listRecords: async () => [], 380 + }; 381 + 382 + const discovery = new PeerDiscovery(mockFetcher as any); 383 + const result = await discovery.discoverPeer("did:plc:test1"); 384 + expect(result).not.toBeNull(); 385 + expect(result!.pdsEndpoint).toBe("https://pds.example.com"); 386 + expect(result!.peerId).toBeNull(); 387 + expect(result!.multiaddrs).toEqual([]); 388 + }); 389 + 390 + it("returns peer info when record exists", async () => { 391 + const mockFetcher = { 392 + resolvePds: async () => "https://pds.example.com", 393 + fetchRecord: async () => ({ 394 + $type: PEER_NSID, 395 + peerId: "12D3KooWTest", 396 + multiaddrs: ["/ip4/127.0.0.1/tcp/4001"], 397 + createdAt: new Date().toISOString(), 398 + }), 399 + listRecords: async () => [], 400 + }; 401 + 402 + const discovery = new PeerDiscovery(mockFetcher as any); 403 + const result = await discovery.discoverPeer("did:plc:test1"); 404 + expect(result).not.toBeNull(); 405 + expect(result!.peerId).toBe("12D3KooWTest"); 406 + expect(result!.multiaddrs).toEqual(["/ip4/127.0.0.1/tcp/4001"]); 407 + }); 408 + }); 409 + 410 + // ============================================ 411 + // BlockVerifier 412 + // ============================================ 413 + 414 + describe("BlockVerifier", () => { 415 + let tmpDir: string; 416 + let ipfsService: IpfsService; 417 + 418 + beforeEach(async () => { 419 + tmpDir = mkdtempSync(join(tmpdir(), "verifier-test-")); 420 + ipfsService = new IpfsService({ 421 + blocksPath: join(tmpDir, "blocks"), 422 + datastorePath: join(tmpDir, "datastore"), 423 + networking: false, 424 + }); 425 + await ipfsService.start(); 426 + }); 427 + 428 + afterEach(async () => { 429 + if (ipfsService.isRunning()) { 430 + await ipfsService.stop(); 431 + } 432 + rmSync(tmpDir, { recursive: true, force: true }); 433 + }); 434 + 435 + it("all blocks available reports 100%", async () => { 436 + const verifier = new BlockVerifier(ipfsService); 437 + 438 + const cids: string[] = []; 439 + for (let i = 0; i < 3; i++) { 440 + const bytes = new TextEncoder().encode(`block-${i}`); 441 + const cidStr = await makeCidStr(bytes); 442 + await ipfsService.putBlock(cidStr, bytes); 443 + cids.push(cidStr); 444 + } 445 + 446 + const result = await verifier.verifyBlockAvailability(cids, 10); 447 + expect(result.checked).toBe(3); 448 + expect(result.available).toBe(3); 449 + expect(result.missing).toEqual([]); 450 + }); 451 + 452 + it("some blocks missing reports correctly", async () => { 453 + const verifier = new BlockVerifier(ipfsService); 454 + 455 + const bytes1 = new TextEncoder().encode("present"); 456 + const cid1 = await makeCidStr(bytes1); 457 + await ipfsService.putBlock(cid1, bytes1); 458 + 459 + const bytes2 = new TextEncoder().encode("missing"); 460 + const cid2 = await makeCidStr(bytes2); 461 + // Don't store cid2 462 + 463 + const result = await verifier.verifyBlockAvailability( 464 + [cid1, cid2], 465 + 10, 466 + ); 467 + expect(result.checked).toBe(2); 468 + expect(result.available).toBe(1); 469 + expect(result.missing).toContain(cid2); 470 + }); 471 + 472 + it("sample size > array length checks all", async () => { 473 + const verifier = new BlockVerifier(ipfsService); 474 + 475 + const bytes = new TextEncoder().encode("single"); 476 + const cidStr = await makeCidStr(bytes); 477 + await ipfsService.putBlock(cidStr, bytes); 478 + 479 + const result = await verifier.verifyBlockAvailability([cidStr], 100); 480 + expect(result.checked).toBe(1); 481 + expect(result.available).toBe(1); 482 + }); 483 + 484 + it("empty array returns zeros", async () => { 485 + const verifier = new BlockVerifier(ipfsService); 486 + 487 + const result = await verifier.verifyBlockAvailability([]); 488 + expect(result.checked).toBe(0); 489 + expect(result.available).toBe(0); 490 + expect(result.missing).toEqual([]); 491 + }); 492 + }); 493 + 494 + // ============================================ 495 + // Integration: repo sync (two in-process repos) 496 + // ============================================ 497 + 498 + describe("Integration: repo sync via CAR roundtrip", () => { 499 + let tmpDir: string; 500 + let sourceDb: InstanceType<typeof Database>; 501 + let replicaDb: InstanceType<typeof Database>; 502 + let sourceIpfs: IpfsService; 503 + let replicaIpfs: IpfsService; 504 + let sourceRepo: RepoManager; 505 + let replicaRepo: RepoManager; 506 + 507 + beforeEach(async () => { 508 + tmpDir = mkdtempSync(join(tmpdir(), "repl-integration-test-")); 509 + 510 + // Source setup 511 + const sourceConfig = testConfig(join(tmpDir, "source"), []); 512 + sourceDb = new Database(join(tmpDir, "source.db")); 513 + sourceIpfs = new IpfsService({ 514 + blocksPath: join(tmpDir, "source-ipfs-blocks"), 515 + datastorePath: join(tmpDir, "source-ipfs-datastore"), 516 + networking: false, 517 + }); 518 + await sourceIpfs.start(); 519 + sourceRepo = new RepoManager(sourceDb, sourceConfig); 520 + sourceRepo.init(undefined, sourceIpfs); 521 + 522 + // Replica setup (different DID for the local node, but will replicate source's data) 523 + const replicaConfig = testConfig(join(tmpDir, "replica"), [ 524 + sourceConfig.DID, 525 + ]); 526 + replicaConfig.DID = "did:plc:replica456"; 527 + replicaConfig.SIGNING_KEY = 528 + "0000000000000000000000000000000000000000000000000000000000000002"; 529 + replicaDb = new Database(join(tmpDir, "replica.db")); 530 + replicaIpfs = new IpfsService({ 531 + blocksPath: join(tmpDir, "replica-ipfs-blocks"), 532 + datastorePath: join(tmpDir, "replica-ipfs-datastore"), 533 + networking: false, 534 + }); 535 + await replicaIpfs.start(); 536 + replicaRepo = new RepoManager(replicaDb, replicaConfig); 537 + replicaRepo.init(undefined, replicaIpfs); 538 + }); 539 + 540 + afterEach(async () => { 541 + if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 542 + if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 543 + sourceDb.close(); 544 + replicaDb.close(); 545 + rmSync(tmpDir, { recursive: true, force: true }); 546 + }); 547 + 548 + it("source records can be replicated via CAR export/import to IPFS", async () => { 549 + // 1. Create records in source 550 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 551 + $type: "app.bsky.feed.post", 552 + text: "Hello from source!", 553 + createdAt: new Date().toISOString(), 554 + }); 555 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 556 + $type: "app.bsky.feed.post", 557 + text: "Second post", 558 + createdAt: new Date().toISOString(), 559 + }); 560 + 561 + // 2. Export as CAR 562 + const carBytes = await sourceRepo.getRepoCar(); 563 + expect(carBytes.length).toBeGreaterThan(0); 564 + 565 + // 3. Parse CAR on replica side 566 + const { root, blocks } = await readCarWithRoot(carBytes); 567 + expect(root).toBeDefined(); 568 + 569 + // 4. Store blocks in replica's IPFS 570 + await replicaIpfs.putBlocks(blocks); 571 + 572 + // 5. Verify blocks are retrievable from replica's IPFS 573 + const internalMap = ( 574 + blocks as unknown as { map: Map<string, Uint8Array> } 575 + ).map; 576 + expect(internalMap).toBeDefined(); 577 + expect(internalMap.size).toBeGreaterThan(0); 578 + 579 + for (const cidStr of internalMap.keys()) { 580 + const has = await replicaIpfs.hasBlock(cidStr); 581 + expect(has).toBe(true); 582 + } 583 + 584 + // 6. Verify block availability 585 + const verifier = new BlockVerifier(replicaIpfs); 586 + const cidStrs = Array.from(internalMap.keys()); 587 + const verification = 588 + await verifier.verifyBlockAvailability(cidStrs); 589 + expect(verification.available).toBe(verification.checked); 590 + expect(verification.missing).toEqual([]); 591 + }); 592 + 593 + it("manifest record updated with sync rev after replication", async () => { 594 + // Create a manifest record in replica 595 + const sourceDid = "did:plc:test123"; 596 + const rkey = didToRkey(sourceDid); 597 + await replicaRepo.putRecord(MANIFEST_NSID, rkey, { 598 + $type: MANIFEST_NSID, 599 + subject: sourceDid, 600 + status: "active", 601 + lastSyncRev: null, 602 + lastSyncAt: null, 603 + createdAt: new Date().toISOString(), 604 + }); 605 + 606 + // Simulate sync: create records in source, export, import to IPFS 607 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 608 + $type: "app.bsky.feed.post", 609 + text: "test post", 610 + createdAt: new Date().toISOString(), 611 + }); 612 + 613 + const carBytes = await sourceRepo.getRepoCar(); 614 + const { root, blocks } = await readCarWithRoot(carBytes); 615 + await replicaIpfs.putBlocks(blocks); 616 + 617 + // Update manifest with sync rev 618 + const syncRev = root.toString(); 619 + await replicaRepo.putRecord(MANIFEST_NSID, rkey, { 620 + $type: MANIFEST_NSID, 621 + subject: sourceDid, 622 + status: "active", 623 + lastSyncRev: syncRev, 624 + lastSyncAt: new Date().toISOString(), 625 + createdAt: new Date().toISOString(), 626 + }); 627 + 628 + // Verify manifest was updated 629 + const result = await replicaRepo.getRecord(MANIFEST_NSID, rkey); 630 + expect(result).not.toBeNull(); 631 + const record = result!.record as Record<string, unknown>; 632 + expect(record.lastSyncRev).toBe(syncRev); 633 + expect(record.lastSyncAt).not.toBeNull(); 634 + expect(record.status).toBe("active"); 635 + }); 636 + });
+117
src/replication/repo-fetcher.ts
··· 1 + /** 2 + * Fetch AT Protocol repos and records from remote PDSes. 3 + */ 4 + 5 + import type { DidResolver, DidDocument } from "../did-resolver.js"; 6 + 7 + export class RepoFetcher { 8 + constructor(private didResolver: DidResolver) {} 9 + 10 + /** 11 + * Resolve a DID to its PDS endpoint. 12 + * Extracts #atproto_pds serviceEndpoint from the DID document. 13 + */ 14 + async resolvePds(did: string): Promise<string | null> { 15 + const doc = await this.didResolver.resolve(did); 16 + if (!doc) return null; 17 + return extractPdsEndpoint(doc); 18 + } 19 + 20 + /** 21 + * Fetch a full repo as CAR bytes from a remote PDS. 22 + * Uses com.atproto.sync.getRepo XRPC endpoint. 23 + */ 24 + async fetchRepo( 25 + pdsEndpoint: string, 26 + did: string, 27 + since?: string, 28 + ): Promise<Uint8Array> { 29 + const url = new URL( 30 + "/xrpc/com.atproto.sync.getRepo", 31 + pdsEndpoint, 32 + ); 33 + url.searchParams.set("did", did); 34 + if (since) { 35 + url.searchParams.set("since", since); 36 + } 37 + 38 + const res = await fetch(url.toString()); 39 + if (!res.ok) { 40 + throw new Error( 41 + `Failed to fetch repo for ${did}: ${res.status} ${res.statusText}`, 42 + ); 43 + } 44 + 45 + return new Uint8Array(await res.arrayBuffer()); 46 + } 47 + 48 + /** 49 + * Fetch a single record from a remote PDS. 50 + * Uses com.atproto.repo.getRecord XRPC endpoint. 51 + */ 52 + async fetchRecord( 53 + pdsEndpoint: string, 54 + did: string, 55 + collection: string, 56 + rkey: string, 57 + ): Promise<unknown | null> { 58 + const url = new URL( 59 + "/xrpc/com.atproto.repo.getRecord", 60 + pdsEndpoint, 61 + ); 62 + url.searchParams.set("repo", did); 63 + url.searchParams.set("collection", collection); 64 + url.searchParams.set("rkey", rkey); 65 + 66 + const res = await fetch(url.toString()); 67 + if (!res.ok) return null; 68 + 69 + const json = (await res.json()) as { value?: unknown }; 70 + return json.value ?? null; 71 + } 72 + 73 + /** 74 + * List records in a collection from a remote PDS. 75 + * Uses com.atproto.repo.listRecords XRPC endpoint. 76 + */ 77 + async listRecords( 78 + pdsEndpoint: string, 79 + did: string, 80 + collection: string, 81 + ): Promise<Array<{ uri: string; cid: string; value: unknown }>> { 82 + const url = new URL( 83 + "/xrpc/com.atproto.repo.listRecords", 84 + pdsEndpoint, 85 + ); 86 + url.searchParams.set("repo", did); 87 + url.searchParams.set("collection", collection); 88 + url.searchParams.set("limit", "100"); 89 + 90 + const res = await fetch(url.toString()); 91 + if (!res.ok) return []; 92 + 93 + const json = (await res.json()) as { 94 + records?: Array<{ uri: string; cid: string; value: unknown }>; 95 + }; 96 + return json.records ?? []; 97 + } 98 + } 99 + 100 + /** 101 + * Extract the #atproto_pds service endpoint from a DID document. 102 + */ 103 + export function extractPdsEndpoint(doc: DidDocument): string | null { 104 + const services = doc.service; 105 + if (!Array.isArray(services)) return null; 106 + 107 + for (const svc of services) { 108 + const service = svc as unknown as Record<string, unknown>; 109 + if ( 110 + service.id === "#atproto_pds" && 111 + typeof service.serviceEndpoint === "string" 112 + ) { 113 + return service.serviceEndpoint; 114 + } 115 + } 116 + return null; 117 + }
+161
src/replication/sync-storage.ts
··· 1 + /** 2 + * SQLite-backed sync state storage for replication. 3 + * Tracks per-DID sync progress separately from the repo. 4 + */ 5 + 6 + import type Database from "better-sqlite3"; 7 + import type { SyncState } from "./types.js"; 8 + 9 + export class SyncStorage { 10 + constructor(private db: Database.Database) {} 11 + 12 + /** 13 + * Create the replication_state table if it doesn't exist. 14 + */ 15 + initSchema(): void { 16 + this.db.exec(` 17 + CREATE TABLE IF NOT EXISTS replication_state ( 18 + did TEXT PRIMARY KEY, 19 + pds_endpoint TEXT NOT NULL, 20 + peer_id TEXT, 21 + peer_info_fetched_at TEXT, 22 + last_sync_rev TEXT, 23 + last_sync_at TEXT, 24 + last_verified_at TEXT, 25 + status TEXT NOT NULL DEFAULT 'pending', 26 + error_message TEXT 27 + ); 28 + `); 29 + } 30 + 31 + /** 32 + * Insert or update sync state for a DID. 33 + */ 34 + upsertState(state: { 35 + did: string; 36 + pdsEndpoint: string; 37 + peerId?: string | null; 38 + status?: string; 39 + }): void { 40 + this.db 41 + .prepare( 42 + `INSERT INTO replication_state (did, pds_endpoint, peer_id, status) 43 + VALUES (?, ?, ?, ?) 44 + ON CONFLICT(did) DO UPDATE SET 45 + pds_endpoint = excluded.pds_endpoint, 46 + peer_id = COALESCE(excluded.peer_id, replication_state.peer_id), 47 + status = excluded.status`, 48 + ) 49 + .run( 50 + state.did, 51 + state.pdsEndpoint, 52 + state.peerId ?? null, 53 + state.status ?? "pending", 54 + ); 55 + } 56 + 57 + /** 58 + * Get sync state for a single DID. 59 + */ 60 + getState(did: string): SyncState | null { 61 + const row = this.db 62 + .prepare("SELECT * FROM replication_state WHERE did = ?") 63 + .get(did) as Record<string, unknown> | undefined; 64 + if (!row) return null; 65 + return this.rowToState(row); 66 + } 67 + 68 + /** 69 + * Get sync states for all tracked DIDs. 70 + */ 71 + getAllStates(): SyncState[] { 72 + const rows = this.db 73 + .prepare("SELECT * FROM replication_state ORDER BY did") 74 + .all() as Array<Record<string, unknown>>; 75 + return rows.map((row) => this.rowToState(row)); 76 + } 77 + 78 + /** 79 + * Update sync progress after a successful sync. 80 + */ 81 + updateSyncProgress(did: string, rev: string): void { 82 + this.db 83 + .prepare( 84 + `UPDATE replication_state 85 + SET last_sync_rev = ?, last_sync_at = datetime('now'), 86 + status = 'synced', error_message = NULL 87 + WHERE did = ?`, 88 + ) 89 + .run(rev, did); 90 + } 91 + 92 + /** 93 + * Update status and optionally set an error message. 94 + */ 95 + updateStatus( 96 + did: string, 97 + status: SyncState["status"], 98 + errorMessage?: string, 99 + ): void { 100 + this.db 101 + .prepare( 102 + `UPDATE replication_state 103 + SET status = ?, error_message = ? 104 + WHERE did = ?`, 105 + ) 106 + .run(status, errorMessage ?? null, did); 107 + } 108 + 109 + /** 110 + * Update cached peer info for a DID. 111 + */ 112 + updatePeerInfo(did: string, peerId: string | null): void { 113 + this.db 114 + .prepare( 115 + `UPDATE replication_state 116 + SET peer_id = ?, peer_info_fetched_at = datetime('now') 117 + WHERE did = ?`, 118 + ) 119 + .run(peerId, did); 120 + } 121 + 122 + /** 123 + * Clear cached peer info (e.g. on connection failure). 124 + */ 125 + clearPeerInfo(did: string): void { 126 + this.db 127 + .prepare( 128 + `UPDATE replication_state 129 + SET peer_id = NULL, peer_info_fetched_at = NULL 130 + WHERE did = ?`, 131 + ) 132 + .run(did); 133 + } 134 + 135 + /** 136 + * Update the last verified timestamp. 137 + */ 138 + updateVerifiedAt(did: string): void { 139 + this.db 140 + .prepare( 141 + `UPDATE replication_state 142 + SET last_verified_at = datetime('now') 143 + WHERE did = ?`, 144 + ) 145 + .run(did); 146 + } 147 + 148 + private rowToState(row: Record<string, unknown>): SyncState { 149 + return { 150 + did: row.did as string, 151 + pdsEndpoint: row.pds_endpoint as string, 152 + peerId: (row.peer_id as string) ?? null, 153 + peerInfoFetchedAt: (row.peer_info_fetched_at as string) ?? null, 154 + lastSyncRev: (row.last_sync_rev as string) ?? null, 155 + lastSyncAt: (row.last_sync_at as string) ?? null, 156 + lastVerifiedAt: (row.last_verified_at as string) ?? null, 157 + status: row.status as SyncState["status"], 158 + errorMessage: (row.error_message as string) ?? null, 159 + }; 160 + } 161 + }
+54
src/replication/types.ts
··· 1 + /** 2 + * Type definitions and constants for P2P replication. 3 + */ 4 + 5 + /** Lexicon NSIDs */ 6 + export const PEER_NSID = "org.p2pds.peer"; 7 + export const MANIFEST_NSID = "org.p2pds.manifest"; 8 + 9 + /** Peer identity record — binds a DID to an IPFS PeerID. */ 10 + export interface PeerIdentityRecord { 11 + $type: typeof PEER_NSID; 12 + peerId: string; 13 + multiaddrs: string[]; 14 + createdAt: string; 15 + } 16 + 17 + /** Replication manifest — declares that this node serves a given DID's data. */ 18 + export interface ManifestRecord { 19 + $type: typeof MANIFEST_NSID; 20 + subject: string; 21 + status: "active" | "paused"; 22 + lastSyncRev: string | null; 23 + lastSyncAt: string | null; 24 + createdAt: string; 25 + } 26 + 27 + /** Operational sync state tracked in SQLite (not in repo). */ 28 + export interface SyncState { 29 + did: string; 30 + pdsEndpoint: string; 31 + peerId: string | null; 32 + peerInfoFetchedAt: string | null; 33 + lastSyncRev: string | null; 34 + lastSyncAt: string | null; 35 + lastVerifiedAt: string | null; 36 + status: "pending" | "syncing" | "synced" | "error"; 37 + errorMessage: string | null; 38 + } 39 + 40 + /** 41 + * Convert a DID to a valid rkey by replacing colons with hyphens. 42 + * e.g. "did:plc:abc123" → "did-plc-abc123" 43 + */ 44 + export function didToRkey(did: string): string { 45 + return did.replace(/:/g, "-"); 46 + } 47 + 48 + /** 49 + * Convert an rkey back to a DID-like string by replacing hyphens with colons. 50 + * Note: only works for simple DIDs without hyphens in method-specific-id. 51 + */ 52 + export function rkeyToDid(rkey: string): string { 53 + return rkey.replace(/-/g, ":"); 54 + }
+62
src/replication/verification.ts
··· 1 + /** 2 + * Spot-check block availability in IPFS blockstore. 3 + */ 4 + 5 + import type { IpfsService } from "../ipfs.js"; 6 + 7 + export interface VerificationResult { 8 + checked: number; 9 + available: number; 10 + missing: string[]; 11 + } 12 + 13 + export class BlockVerifier { 14 + constructor(private ipfsService: IpfsService) {} 15 + 16 + /** 17 + * Verify that a random sample of blocks are available in our blockstore. 18 + * If sampleSize >= array length, checks all blocks. 19 + */ 20 + async verifyBlockAvailability( 21 + blockCids: string[], 22 + sampleSize: number = 5, 23 + ): Promise<VerificationResult> { 24 + if (blockCids.length === 0) { 25 + return { checked: 0, available: 0, missing: [] }; 26 + } 27 + 28 + // Sample randomly, or check all if sample >= total 29 + const toCheck = 30 + sampleSize >= blockCids.length 31 + ? [...blockCids] 32 + : this.randomSample(blockCids, sampleSize); 33 + 34 + const missing: string[] = []; 35 + let available = 0; 36 + 37 + for (const cid of toCheck) { 38 + const has = await this.ipfsService.hasBlock(cid); 39 + if (has) { 40 + available++; 41 + } else { 42 + missing.push(cid); 43 + } 44 + } 45 + 46 + return { 47 + checked: toCheck.length, 48 + available, 49 + missing, 50 + }; 51 + } 52 + 53 + private randomSample(arr: string[], size: number): string[] { 54 + const shuffled = [...arr]; 55 + // Fisher-Yates partial shuffle 56 + for (let i = shuffled.length - 1; i > 0 && i >= shuffled.length - size; i--) { 57 + const j = Math.floor(Math.random() * (i + 1)); 58 + [shuffled[i], shuffled[j]] = [shuffled[j]!, shuffled[i]!]; 59 + } 60 + return shuffled.slice(-size); 61 + } 62 + }
+34 -1
src/server.ts
··· 12 12 import { Firehose } from "./firehose.js"; 13 13 import { IpfsService } from "./ipfs.js"; 14 14 import { createApp } from "./index.js"; 15 + import { DidResolver } from "./did-resolver.js"; 16 + import { InMemoryDidCache } from "./did-cache.js"; 17 + import { ReplicationManager } from "./replication/replication-manager.js"; 15 18 16 19 // Load configuration 17 20 const config = loadConfig(); ··· 49 52 // Initialize firehose 50 53 const firehose = new Firehose(repoManager); 51 54 55 + // Initialize DID resolver 56 + const didResolver = new DidResolver({ 57 + didCache: new InMemoryDidCache(), 58 + }); 59 + 60 + // Initialize replication manager (if IPFS enabled and DIDs configured) 61 + let replicationManager: ReplicationManager | undefined; 62 + if (ipfsService && config.REPLICATE_DIDS.length > 0) { 63 + replicationManager = new ReplicationManager( 64 + db, 65 + config, 66 + repoManager, 67 + ipfsService, 68 + didResolver, 69 + ); 70 + } 71 + 52 72 // Create Hono app 53 - const app = createApp(config, repoManager, firehose, ipfsService, blobStore); 73 + const app = createApp(config, repoManager, firehose, ipfsService, blobStore, replicationManager); 54 74 55 75 // Create HTTP server using @hono/node-server's request listener 56 76 const requestListener = getRequestListener(app.fetch); ··· 117 137 console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`)); 118 138 } 119 139 await backfillIpfs(); 140 + // Start replication after IPFS is ready 141 + if (replicationManager) { 142 + try { 143 + await replicationManager.init(); 144 + replicationManager.startPeriodicSync(); 145 + console.log(pc.dim(` Replication: tracking ${config.REPLICATE_DIDS.length} DIDs`)); 146 + } catch (err) { 147 + console.error(pc.red(` Replication startup failed:`), err); 148 + } 149 + } 120 150 } catch (err) { 121 151 console.error(pc.red(` IPFS startup failed:`), err); 122 152 } ··· 129 159 function shutdown() { 130 160 console.log(pc.dim("\nShutting down...")); 131 161 const cleanup = async () => { 162 + if (replicationManager) { 163 + replicationManager.stop(); 164 + } 132 165 if (ipfsService) { 133 166 await ipfsService.stop(); 134 167 }