atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add blob replication: fetch, verify, store, and serve replicated blobs

After syncing repo blocks, walk records to extract blob CIDs, fetch from
source PDS via com.atproto.sync.getBlob, CID-verify, store in BlockStore
(IPFS), and track in replication_blobs table. Firehose path also triggers
blob fetching for create/update ops. getBlob endpoint extended to serve
replicated blobs from BlockStore.

+594 -31
+1 -1
src/index.ts
··· 213 213 sync.getBlocks(c, repoManager), 214 214 ); 215 215 app.get("/xrpc/com.atproto.sync.getBlob", (c) => 216 - sync.getBlob(c, repoManager), 216 + sync.getBlob(c, repoManager, blockStore, replicationManager?.getSyncStorage()), 217 217 ); 218 218 app.get("/xrpc/com.atproto.sync.listRepos", (c) => 219 219 sync.listRepos(c, repoManager, replicatedRepoReader),
+298
src/replication/blob-replication.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 2 + import { mkdtempSync, rmSync } from "node:fs"; 3 + import { tmpdir } from "node:os"; 4 + import { join } from "node:path"; 5 + import Database from "better-sqlite3"; 6 + import { 7 + create as createCid, 8 + CODEC_RAW, 9 + toString as cidToString, 10 + } from "@atcute/cid"; 11 + 12 + import { SyncStorage } from "./sync-storage.js"; 13 + import { RepoFetcher } from "./repo-fetcher.js"; 14 + import { IpfsService } from "../ipfs.js"; 15 + import { detectContentType } from "../format.js"; 16 + 17 + /** Create a CID string from raw bytes using SHA-256. */ 18 + async function makeCidStr(bytes: Uint8Array): Promise<string> { 19 + const cid = await createCid(CODEC_RAW, bytes); 20 + return cidToString(cid); 21 + } 22 + 23 + // ============================================ 24 + // SyncStorage blob tracking 25 + // ============================================ 26 + 27 + describe("SyncStorage blob tracking", () => { 28 + let tmpDir: string; 29 + let db: InstanceType<typeof Database>; 30 + let storage: SyncStorage; 31 + 32 + beforeEach(() => { 33 + tmpDir = mkdtempSync(join(tmpdir(), "blob-sync-test-")); 34 + db = new Database(join(tmpDir, "test.db")); 35 + storage = new SyncStorage(db); 36 + storage.initSchema(); 37 + }); 38 + 39 + afterEach(() => { 40 + db.close(); 41 + try { 42 + rmSync(tmpDir, { recursive: true, force: true }); 43 + } catch {} 44 + }); 45 + 46 + it("trackBlobs inserts and hasBlobCid returns true", () => { 47 + const did = "did:plc:test1"; 48 + storage.trackBlobs(did, ["bafkreia", "bafkreib"]); 49 + expect(storage.hasBlobCid(did, "bafkreia")).toBe(true); 50 + expect(storage.hasBlobCid(did, "bafkreib")).toBe(true); 51 + }); 52 + 53 + it("hasBlobCid returns false for untracked CID", () => { 54 + expect(storage.hasBlobCid("did:plc:test1", "bafkreix")).toBe(false); 55 + }); 56 + 57 + it("getBlobCids returns all tracked CIDs", () => { 58 + const did = "did:plc:test1"; 59 + storage.trackBlobs(did, ["bafkreia", "bafkreib", "bafkreic"]); 60 + const cids = storage.getBlobCids(did); 61 + expect(cids).toHaveLength(3); 62 + expect(cids).toContain("bafkreia"); 63 + expect(cids).toContain("bafkreib"); 64 + expect(cids).toContain("bafkreic"); 65 + }); 66 + 67 + it("duplicate CIDs are silently ignored", () => { 68 + const did = "did:plc:test1"; 69 + storage.trackBlobs(did, ["bafkreia", "bafkreia"]); 70 + storage.trackBlobs(did, ["bafkreia"]); 71 + expect(storage.getBlobCount(did)).toBe(1); 72 + }); 73 + 74 + it("tracking is per-DID (independent)", () => { 75 + storage.trackBlobs("did:plc:a", ["bafkreia"]); 76 + storage.trackBlobs("did:plc:b", ["bafkreib"]); 77 + expect(storage.hasBlobCid("did:plc:a", "bafkreia")).toBe(true); 78 + expect(storage.hasBlobCid("did:plc:a", "bafkreib")).toBe(false); 79 + expect(storage.hasBlobCid("did:plc:b", "bafkreib")).toBe(true); 80 + expect(storage.hasBlobCid("did:plc:b", "bafkreia")).toBe(false); 81 + }); 82 + 83 + it("getBlobCount returns correct count", () => { 84 + const did = "did:plc:test1"; 85 + expect(storage.getBlobCount(did)).toBe(0); 86 + storage.trackBlobs(did, ["bafkreia", "bafkreib"]); 87 + expect(storage.getBlobCount(did)).toBe(2); 88 + }); 89 + }); 90 + 91 + // ============================================ 92 + // RepoFetcher.fetchBlob 93 + // ============================================ 94 + 95 + describe("RepoFetcher.fetchBlob", () => { 96 + let fetcher: RepoFetcher; 97 + 98 + beforeEach(() => { 99 + // Create RepoFetcher with a stub DidResolver (not used by fetchBlob) 100 + fetcher = new RepoFetcher({ 101 + resolve: async () => null, 102 + } as any); 103 + }); 104 + 105 + afterEach(() => { 106 + vi.restoreAllMocks(); 107 + }); 108 + 109 + it("returns bytes on 200", async () => { 110 + const blobBytes = new Uint8Array([1, 2, 3, 4, 5]); 111 + vi.spyOn(globalThis, "fetch").mockResolvedValue( 112 + new Response(blobBytes, { status: 200 }) as unknown as Response, 113 + ); 114 + 115 + const result = await fetcher.fetchBlob( 116 + "https://pds.example.com", 117 + "did:plc:test", 118 + "bafkreia", 119 + ); 120 + expect(result).toBeInstanceOf(Uint8Array); 121 + expect(result).toEqual(blobBytes); 122 + }); 123 + 124 + it("returns null on 404", async () => { 125 + vi.spyOn(globalThis, "fetch").mockResolvedValue( 126 + new Response(null, { status: 404 }) as unknown as Response, 127 + ); 128 + 129 + const result = await fetcher.fetchBlob( 130 + "https://pds.example.com", 131 + "did:plc:test", 132 + "bafkreia", 133 + ); 134 + expect(result).toBeNull(); 135 + }); 136 + 137 + it("throws on 500", async () => { 138 + vi.spyOn(globalThis, "fetch").mockResolvedValue( 139 + new Response(null, { status: 500, statusText: "Internal Server Error" }) as unknown as Response, 140 + ); 141 + 142 + await expect( 143 + fetcher.fetchBlob("https://pds.example.com", "did:plc:test", "bafkreia"), 144 + ).rejects.toThrow("Failed to fetch blob"); 145 + }); 146 + }); 147 + 148 + // ============================================ 149 + // CID verification 150 + // ============================================ 151 + 152 + describe("CID verification", () => { 153 + it("correct bytes pass verification", async () => { 154 + const bytes = new Uint8Array([10, 20, 30, 40, 50]); 155 + const expectedCid = await makeCidStr(bytes); 156 + 157 + // Recompute and compare 158 + const actualCid = await createCid(CODEC_RAW, bytes); 159 + expect(cidToString(actualCid)).toBe(expectedCid); 160 + }); 161 + 162 + it("wrong bytes fail verification", async () => { 163 + const bytes = new Uint8Array([10, 20, 30, 40, 50]); 164 + const expectedCid = await makeCidStr(bytes); 165 + 166 + const wrongBytes = new Uint8Array([99, 99, 99]); 167 + const actualCid = await createCid(CODEC_RAW, wrongBytes); 168 + expect(cidToString(actualCid)).not.toBe(expectedCid); 169 + }); 170 + }); 171 + 172 + // ============================================ 173 + // Blob sync integration 174 + // ============================================ 175 + 176 + describe("syncBlobs integration", () => { 177 + let tmpDir: string; 178 + let db: InstanceType<typeof Database>; 179 + let syncStorage: SyncStorage; 180 + let ipfsService: IpfsService; 181 + 182 + beforeEach(async () => { 183 + tmpDir = mkdtempSync(join(tmpdir(), "blob-integration-test-")); 184 + db = new Database(join(tmpDir, "test.db")); 185 + syncStorage = new SyncStorage(db); 186 + syncStorage.initSchema(); 187 + 188 + ipfsService = new IpfsService({ 189 + blocksPath: join(tmpDir, "blocks"), 190 + datastorePath: join(tmpDir, "datastore"), 191 + networking: false, 192 + }); 193 + await ipfsService.start(); 194 + }); 195 + 196 + afterEach(async () => { 197 + await ipfsService.stop(); 198 + db.close(); 199 + try { 200 + rmSync(tmpDir, { recursive: true, force: true }); 201 + } catch {} 202 + }); 203 + 204 + it("stores blob in BlockStore and tracks in SyncStorage", async () => { 205 + const did = "did:plc:blobtest"; 206 + const blobBytes = new Uint8Array([72, 101, 108, 108, 111]); // "Hello" 207 + const blobCid = await makeCidStr(blobBytes); 208 + 209 + // Store blob in BlockStore 210 + await ipfsService.putBlock(blobCid, blobBytes); 211 + 212 + // Track in SyncStorage 213 + syncStorage.trackBlobs(did, [blobCid]); 214 + 215 + // Verify it's tracked 216 + expect(syncStorage.hasBlobCid(did, blobCid)).toBe(true); 217 + 218 + // Verify it's in BlockStore 219 + const retrieved = await ipfsService.getBlock(blobCid); 220 + expect(new Uint8Array(retrieved!)).toEqual(blobBytes); 221 + }); 222 + 223 + it("already-fetched blobs are skipped (incremental)", async () => { 224 + const did = "did:plc:blobtest"; 225 + const blobBytes = new Uint8Array([1, 2, 3]); 226 + const blobCid = await makeCidStr(blobBytes); 227 + 228 + // First time: track 229 + syncStorage.trackBlobs(did, [blobCid]); 230 + expect(syncStorage.hasBlobCid(did, blobCid)).toBe(true); 231 + 232 + // Second time: hasBlobCid returns true, so skip 233 + expect(syncStorage.hasBlobCid(did, blobCid)).toBe(true); 234 + expect(syncStorage.getBlobCount(did)).toBe(1); 235 + }); 236 + }); 237 + 238 + // ============================================ 239 + // getBlob endpoint (replicated blobs) 240 + // ============================================ 241 + 242 + describe("getBlob replicated blobs", () => { 243 + let tmpDir: string; 244 + let db: InstanceType<typeof Database>; 245 + let syncStorage: SyncStorage; 246 + let ipfsService: IpfsService; 247 + 248 + beforeEach(async () => { 249 + tmpDir = mkdtempSync(join(tmpdir(), "getblob-test-")); 250 + db = new Database(join(tmpDir, "test.db")); 251 + syncStorage = new SyncStorage(db); 252 + syncStorage.initSchema(); 253 + 254 + ipfsService = new IpfsService({ 255 + blocksPath: join(tmpDir, "blocks"), 256 + datastorePath: join(tmpDir, "datastore"), 257 + networking: false, 258 + }); 259 + await ipfsService.start(); 260 + }); 261 + 262 + afterEach(async () => { 263 + await ipfsService.stop(); 264 + db.close(); 265 + try { 266 + rmSync(tmpDir, { recursive: true, force: true }); 267 + } catch {} 268 + }); 269 + 270 + it("returns replicated blob bytes when tracked", async () => { 271 + const did = "did:plc:replicated"; 272 + const blobBytes = new Uint8Array([0x89, 0x50, 0x4e, 0x47]); // PNG magic bytes 273 + const blobCid = await makeCidStr(blobBytes); 274 + 275 + // Store and track 276 + await ipfsService.putBlock(blobCid, blobBytes); 277 + syncStorage.trackBlobs(did, [blobCid]); 278 + 279 + // Verify tracking 280 + expect(syncStorage.hasBlobCid(did, blobCid)).toBe(true); 281 + 282 + // Verify retrieval 283 + const bytes = await ipfsService.getBlock(blobCid); 284 + expect(new Uint8Array(bytes!)).toEqual(blobBytes); 285 + }); 286 + 287 + it("returns null for untracked CID", async () => { 288 + const did = "did:plc:replicated"; 289 + const blobCid = "bafkreiuntracked"; 290 + 291 + // Not tracked 292 + expect(syncStorage.hasBlobCid(did, blobCid)).toBe(false); 293 + 294 + // BlockStore returns null for unknown CID 295 + const bytes = await ipfsService.getBlock(blobCid); 296 + expect(bytes).toBeNull(); 297 + }); 298 + });
+151
src/replication/replication-manager.ts
··· 7 7 import type Database from "better-sqlite3"; 8 8 import type { Config } from "../config.js"; 9 9 import type { RepoManager } from "../repo-manager.js"; 10 + import { extractBlobCids } from "../repo-manager.js"; 11 + import { create as createCid, CODEC_RAW, toString as cidToString } from "@atcute/cid"; 10 12 import type { BlockStore, NetworkService } from "../ipfs.js"; 11 13 import type { DidResolver } from "../did-resolver.js"; 12 14 import { readCarWithRoot } from "@atproto/repo"; ··· 463 465 }; 464 466 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 465 467 } 468 + 469 + // 11. Sync blobs (fire-and-forget) 470 + this.syncBlobs(did).catch((err) => { 471 + console.warn( 472 + `[replication] Blob sync error for ${did}:`, 473 + err instanceof Error ? err.message : String(err), 474 + ); 475 + }); 466 476 } 467 477 468 478 /** ··· 753 763 }; 754 764 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 755 765 } 766 + 767 + // 10. Sync blobs for firehose ops (fire-and-forget) 768 + const pdsEndpoint = this.syncStorage.getState(did)?.pdsEndpoint; 769 + if (pdsEndpoint) { 770 + this.syncBlobsForOps(did, pdsEndpoint, event.ops).catch((err) => { 771 + console.warn( 772 + `[replication] Blob sync error for firehose ops (${did}):`, 773 + err instanceof Error ? err.message : String(err), 774 + ); 775 + }); 776 + } 756 777 } 757 778 758 779 /** ··· 889 910 */ 890 911 getVerificationResults(): Map<string, LayeredVerificationResult> { 891 912 return this.lastVerificationResults; 913 + } 914 + 915 + // ============================================ 916 + // Blob replication 917 + // ============================================ 918 + 919 + /** 920 + * Sync blobs for a DID: walk all records, extract blob CIDs, fetch new ones. 921 + */ 922 + async syncBlobs(did: string): Promise<{ fetched: number; skipped: number; errors: number }> { 923 + const state = this.syncStorage.getState(did); 924 + if (!state?.pdsEndpoint || !state.rootCid) { 925 + return { fetched: 0, skipped: 0, errors: 0 }; 926 + } 927 + 928 + if (!this.replicatedRepoReader) { 929 + return { fetched: 0, skipped: 0, errors: 0 }; 930 + } 931 + 932 + // Walk all records and collect blob CIDs 933 + const allBlobCids = new Set<string>(); 934 + const repo = await this.replicatedRepoReader.getRepo(did); 935 + if (!repo) { 936 + return { fetched: 0, skipped: 0, errors: 0 }; 937 + } 938 + 939 + for await (const entry of repo.walkRecords()) { 940 + const cids = extractBlobCids(entry.record); 941 + for (const cid of cids) { 942 + allBlobCids.add(cid); 943 + } 944 + } 945 + 946 + // Filter to blobs not already fetched 947 + let fetched = 0; 948 + let skipped = 0; 949 + let errors = 0; 950 + 951 + for (const blobCid of allBlobCids) { 952 + if (this.syncStorage.hasBlobCid(did, blobCid)) { 953 + skipped++; 954 + continue; 955 + } 956 + 957 + try { 958 + const bytes = await this.repoFetcher.fetchBlob( 959 + state.pdsEndpoint, 960 + did, 961 + blobCid, 962 + ); 963 + if (!bytes) { 964 + skipped++; // 404 — blob deleted upstream 965 + continue; 966 + } 967 + 968 + if (!await this.verifyBlobCid(blobCid, bytes)) { 969 + console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`); 970 + errors++; 971 + continue; 972 + } 973 + 974 + await this.blockStore.putBlock(blobCid, bytes); 975 + this.syncStorage.trackBlobs(did, [blobCid]); 976 + this.networkService.provideBlocks([blobCid]).catch(() => {}); 977 + fetched++; 978 + } catch (err) { 979 + console.warn( 980 + `[replication] Failed to fetch blob ${blobCid} for ${did}:`, 981 + err instanceof Error ? err.message : String(err), 982 + ); 983 + errors++; 984 + } 985 + } 986 + 987 + return { fetched, skipped, errors }; 988 + } 989 + 990 + /** 991 + * Sync blobs for specific firehose ops (create/update only). 992 + */ 993 + private async syncBlobsForOps( 994 + did: string, 995 + pdsEndpoint: string, 996 + ops: Array<{ action: string; path: string }>, 997 + ): Promise<void> { 998 + if (!this.replicatedRepoReader) return; 999 + 1000 + for (const op of ops) { 1001 + if (op.action !== "create" && op.action !== "update") continue; 1002 + 1003 + const parts = op.path.split("/"); 1004 + if (parts.length < 2) continue; 1005 + const collection = parts.slice(0, -1).join("/"); 1006 + const rkey = parts[parts.length - 1]!; 1007 + 1008 + try { 1009 + const record = await this.replicatedRepoReader.getRecord(did, collection, rkey); 1010 + if (!record) continue; 1011 + 1012 + const blobCids = extractBlobCids(record.value); 1013 + for (const blobCid of blobCids) { 1014 + if (this.syncStorage.hasBlobCid(did, blobCid)) continue; 1015 + 1016 + const bytes = await this.repoFetcher.fetchBlob(pdsEndpoint, did, blobCid); 1017 + if (!bytes) continue; 1018 + 1019 + if (!await this.verifyBlobCid(blobCid, bytes)) { 1020 + console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`); 1021 + continue; 1022 + } 1023 + 1024 + await this.blockStore.putBlock(blobCid, bytes); 1025 + this.syncStorage.trackBlobs(did, [blobCid]); 1026 + this.networkService.provideBlocks([blobCid]).catch(() => {}); 1027 + } 1028 + } catch (err) { 1029 + console.warn( 1030 + `[replication] Failed to sync blobs for op ${op.path} (${did}):`, 1031 + err instanceof Error ? err.message : String(err), 1032 + ); 1033 + } 1034 + } 1035 + } 1036 + 1037 + /** 1038 + * Verify that blob bytes match the expected CID. 1039 + */ 1040 + private async verifyBlobCid(expectedCid: string, bytes: Uint8Array): Promise<boolean> { 1041 + const actualCid = await createCid(CODEC_RAW, bytes); 1042 + return cidToString(actualCid) === expectedCid; 892 1043 } 893 1044 894 1045 // ============================================
+29
src/replication/repo-fetcher.ts
··· 71 71 } 72 72 73 73 /** 74 + * Fetch a blob from a remote PDS. 75 + * Returns null on 404 (blob deleted upstream), throws on other errors. 76 + */ 77 + async fetchBlob( 78 + pdsEndpoint: string, 79 + did: string, 80 + cid: string, 81 + ): Promise<Uint8Array | null> { 82 + const url = new URL( 83 + "/xrpc/com.atproto.sync.getBlob", 84 + pdsEndpoint, 85 + ); 86 + url.searchParams.set("did", did); 87 + url.searchParams.set("cid", cid); 88 + 89 + const res = await fetch(url.toString()); 90 + if (res.status === 404) { 91 + return null; 92 + } 93 + if (!res.ok) { 94 + throw new Error( 95 + `Failed to fetch blob ${cid} for ${did}: ${res.status} ${res.statusText}`, 96 + ); 97 + } 98 + 99 + return new Uint8Array(await res.arrayBuffer()); 100 + } 101 + 102 + /** 74 103 * List records in a collection from a remote PDS. 75 104 * Uses com.atproto.repo.listRecords XRPC endpoint. 76 105 */
+64
src/replication/sync-storage.ts
··· 44 44 ); 45 45 `); 46 46 47 + // Blob tracking table: tracks replicated blob CIDs per DID. 48 + this.db.exec(` 49 + CREATE TABLE IF NOT EXISTS replication_blobs ( 50 + did TEXT NOT NULL, 51 + cid TEXT NOT NULL, 52 + fetched_at TEXT NOT NULL DEFAULT (datetime('now')), 53 + PRIMARY KEY (did, cid) 54 + ); 55 + `); 56 + 47 57 // Record paths table: tracks record paths per DID for challenge generation. 48 58 this.db.exec(` 49 59 CREATE TABLE IF NOT EXISTS replication_record_paths ( ··· 322 332 this.db 323 333 .prepare("DELETE FROM replication_record_paths WHERE did = ?") 324 334 .run(did); 335 + } 336 + 337 + // ============================================ 338 + // Blob tracking (for replicated blob CIDs) 339 + // ============================================ 340 + 341 + /** 342 + * Track blob CIDs for a DID (batch insert, ignores duplicates). 343 + */ 344 + trackBlobs(did: string, cids: string[]): void { 345 + if (cids.length === 0) return; 346 + const insert = this.db.prepare( 347 + "INSERT OR IGNORE INTO replication_blobs (did, cid) VALUES (?, ?)", 348 + ); 349 + const batch = this.db.transaction((items: string[]) => { 350 + for (const cid of items) { 351 + insert.run(did, cid); 352 + } 353 + }); 354 + batch(cids); 355 + } 356 + 357 + /** 358 + * Check if a blob CID has been fetched for a DID. 359 + */ 360 + hasBlobCid(did: string, cid: string): boolean { 361 + const row = this.db 362 + .prepare( 363 + "SELECT 1 FROM replication_blobs WHERE did = ? AND cid = ?", 364 + ) 365 + .get(did, cid); 366 + return row !== undefined; 367 + } 368 + 369 + /** 370 + * Get all tracked blob CIDs for a DID. 371 + */ 372 + getBlobCids(did: string): string[] { 373 + const rows = this.db 374 + .prepare("SELECT cid FROM replication_blobs WHERE did = ?") 375 + .all(did) as Array<{ cid: string }>; 376 + return rows.map((r) => r.cid); 377 + } 378 + 379 + /** 380 + * Get the count of tracked blobs for a DID. 381 + */ 382 + getBlobCount(did: string): number { 383 + const row = this.db 384 + .prepare( 385 + "SELECT COUNT(*) as count FROM replication_blobs WHERE did = ?", 386 + ) 387 + .get(did) as { count: number }; 388 + return row.count; 325 389 } 326 390 327 391 private rowToState(row: Record<string, unknown>): SyncState {
+1 -1
src/repo-manager.ts
··· 881 881 /** 882 882 * Extract blob CIDs from a record by recursively searching for blob references. 883 883 */ 884 - function extractBlobCids(obj: unknown): string[] { 884 + export function extractBlobCids(obj: unknown): string[] { 885 885 const cids: string[] = []; 886 886 887 887 function walk(value: unknown): void {
+50 -29
src/xrpc/sync.ts
··· 4 4 import type { AppEnv } from "../types.js"; 5 5 import { detectContentType } from "../format.js"; 6 6 import type { ReplicatedRepoReader } from "../replication/replicated-repo-reader.js"; 7 + import type { BlockStore } from "../ipfs.js"; 8 + import type { SyncStorage } from "../replication/sync-storage.js"; 7 9 8 10 export async function getRepo( 9 11 c: Context<AppEnv>, ··· 222 224 export async function getBlob( 223 225 c: Context<AppEnv>, 224 226 repoManager: RepoManager, 227 + blockStore?: BlockStore, 228 + syncStorage?: SyncStorage, 225 229 ): Promise<Response> { 226 230 const did = c.req.query("did"); 227 231 const cid = c.req.query("cid"); ··· 240 244 ); 241 245 } 242 246 243 - if (did !== c.env.DID) { 244 - return c.json( 245 - { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 246 - 404, 247 - ); 248 - } 247 + // Local DID: serve from filesystem BlobStore 248 + if (did === c.env.DID) { 249 + if (!repoManager.blobStore) { 250 + return c.json( 251 + { error: "ServiceUnavailable", message: "Blob storage is not configured" }, 252 + 503, 253 + ); 254 + } 249 255 250 - if (!repoManager.blobStore) { 251 - return c.json( 252 - { error: "ServiceUnavailable", message: "Blob storage is not configured" }, 253 - 503, 254 - ); 255 - } 256 + const blob = repoManager.blobStore.getBlob(cid); 257 + 258 + if (!blob) { 259 + return c.json( 260 + { error: "BlobNotFound", message: `Blob not found: ${cid}` }, 261 + 404, 262 + ); 263 + } 256 264 257 - const blob = repoManager.blobStore.getBlob(cid); 265 + let contentType = blob.mimeType; 266 + if (!contentType || contentType === "*/*") { 267 + contentType = 268 + detectContentType(blob.bytes) || "application/octet-stream"; 269 + } 258 270 259 - if (!blob) { 260 - return c.json( 261 - { error: "BlobNotFound", message: `Blob not found: ${cid}` }, 262 - 404, 263 - ); 271 + return new Response(blob.bytes, { 272 + status: 200, 273 + headers: { 274 + "Content-Type": contentType, 275 + "Content-Length": blob.size.toString(), 276 + }, 277 + }); 264 278 } 265 279 266 - let contentType = blob.mimeType; 267 - if (!contentType || contentType === "*/*") { 268 - contentType = 269 - detectContentType(blob.bytes) || "application/octet-stream"; 280 + // Replicated DID: serve from BlockStore if tracked 281 + if (blockStore && syncStorage && syncStorage.hasBlobCid(did, cid)) { 282 + const bytes = await blockStore.getBlock(cid); 283 + if (bytes) { 284 + const contentType = 285 + detectContentType(bytes) || "application/octet-stream"; 286 + return new Response(bytes, { 287 + status: 200, 288 + headers: { 289 + "Content-Type": contentType, 290 + "Content-Length": bytes.length.toString(), 291 + }, 292 + }); 293 + } 270 294 } 271 295 272 - return new Response(blob.bytes, { 273 - status: 200, 274 - headers: { 275 - "Content-Type": contentType, 276 - "Content-Length": blob.size.toString(), 277 - }, 278 - }); 296 + return c.json( 297 + { error: "BlobNotFound", message: `Blob not found: ${cid}` }, 298 + 404, 299 + ); 279 300 } 280 301 281 302 export async function getRecord(