atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 206 lines 5.2 kB view raw
1/** 2 * Service that loads and caches ReadableRepo instances for replicated DIDs. 3 * Provides read-only access to replicated repo data via IPFS blocks. 4 */ 5 6import { CID } from "multiformats"; 7import { ReadableRepo } from "@atproto/repo/dist/readable-repo.js"; 8import type { BlockStore } from "../ipfs.js"; 9import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js"; 10import type { SyncStorage } from "./sync-storage.js"; 11 12export class ReplicatedRepoReader { 13 private cache = new Map<string, ReadableRepo>(); 14 private readableBlockstore: IpfsReadableBlockstore; 15 16 constructor( 17 blockStore: BlockStore, 18 private syncStorage: SyncStorage, 19 ) { 20 this.readableBlockstore = new IpfsReadableBlockstore(blockStore); 21 } 22 23 /** 24 * Check if a DID is tracked for replication. 25 */ 26 isReplicatedDid(did: string): boolean { 27 const state = this.syncStorage.getState(did); 28 return state !== null; 29 } 30 31 /** 32 * Load a ReadableRepo for a replicated DID (cached until invalidated). 33 */ 34 async getRepo(did: string): Promise<ReadableRepo | null> { 35 const cached = this.cache.get(did); 36 if (cached) return cached; 37 38 const state = this.syncStorage.getState(did); 39 if (!state?.rootCid) return null; 40 41 try { 42 const commitCid = CID.parse(state.rootCid); 43 const repo = await ReadableRepo.load( 44 this.readableBlockstore, 45 commitCid, 46 ); 47 this.cache.set(did, repo); 48 return repo; 49 } catch { 50 return null; 51 } 52 } 53 54 /** 55 * Get a single record from a replicated repo. 56 */ 57 async getRecord( 58 did: string, 59 collection: string, 60 rkey: string, 61 ): Promise<{ cid: string; value: unknown } | null> { 62 const repo = await this.getRepo(did); 63 if (!repo) return null; 64 65 // Get the record CID from the MST 66 const key = `${collection}/${rkey}`; 67 const recordCid = await repo.data.get(key); 68 if (!recordCid) return null; 69 70 // Read the record value 71 const value = await repo.getRecord(collection, rkey); 72 if (value === null) return null; 73 74 return { cid: recordCid.toString(), value }; 75 } 76 77 /** 78 * List records from a collection in a replicated repo. 79 */ 80 async listRecords( 81 did: string, 82 collection: string, 83 opts: { limit?: number; cursor?: string; reverse?: boolean } = {}, 84 ): Promise<{ 85 records: Array<{ uri: string; cid: string; value: unknown }>; 86 cursor?: string; 87 }> { 88 const repo = await this.getRepo(did); 89 if (!repo) return { records: [] }; 90 91 const limit = opts.limit ?? 50; 92 const prefix = `${collection}/`; 93 const records: Array<{ uri: string; cid: string; value: unknown }> = []; 94 95 // Use walkRecords with a starting key for cursor support 96 const startKey = opts.cursor 97 ? `${collection}/${opts.cursor}` 98 : collection; 99 100 // Collect matching records via MST list 101 // MST.list(count, after, before) returns Leaf[] sorted by key 102 const leaves = opts.reverse 103 ? await repo.data.list(undefined, undefined, `${collection}0`) 104 : await repo.data.list( 105 undefined, 106 opts.cursor ? `${collection}/${opts.cursor}` : undefined, 107 ); 108 109 for (const leaf of leaves) { 110 if (records.length >= limit) break; 111 if (!leaf.key.startsWith(prefix)) { 112 // If we've passed the collection prefix, stop 113 if (leaf.key > prefix + "\xff") break; 114 continue; 115 } 116 117 const rkey = leaf.key.slice(prefix.length); 118 const value = await this.readableBlockstore.attemptReadRecord( 119 leaf.value, 120 ); 121 if (value !== null) { 122 records.push({ 123 uri: `at://${did}/${collection}/${rkey}`, 124 cid: leaf.value.toString(), 125 value, 126 }); 127 } 128 } 129 130 if (opts.reverse) { 131 records.reverse(); 132 } 133 134 const result: { 135 records: Array<{ uri: string; cid: string; value: unknown }>; 136 cursor?: string; 137 } = { records }; 138 139 if (records.length === limit) { 140 const lastRecord = records[records.length - 1]!; 141 const lastUri = lastRecord.uri; 142 // cursor is the rkey of the last record 143 result.cursor = lastUri.split("/").pop()!; 144 } 145 146 return result; 147 } 148 149 /** 150 * Describe a replicated repo (list collections). 151 */ 152 async describeRepo( 153 did: string, 154 ): Promise<{ did: string; collections: string[] } | null> { 155 const repo = await this.getRepo(did); 156 if (!repo) return null; 157 158 const collections = new Set<string>(); 159 for await (const entry of repo.walkRecords()) { 160 collections.add(entry.collection); 161 } 162 163 return { did, collections: Array.from(collections).sort() }; 164 } 165 166 /** 167 * Get repo status for a replicated DID. 168 */ 169 getRepoStatus( 170 did: string, 171 ): { did: string; rev: string | null; rootCid: string | null; active: boolean } | null { 172 const state = this.syncStorage.getState(did); 173 if (!state) return null; 174 175 return { 176 did, 177 rev: state.lastSyncRev, 178 rootCid: state.rootCid, 179 active: state.status === "synced", 180 }; 181 } 182 183 /** 184 * Get repo statuses for all replicated DIDs. 185 */ 186 getAllRepoStatuses(): Array<{ 187 did: string; 188 rev: string | null; 189 rootCid: string | null; 190 active: boolean; 191 }> { 192 return this.syncStorage.getAllStates().map((state) => ({ 193 did: state.did, 194 rev: state.lastSyncRev, 195 rootCid: state.rootCid, 196 active: state.status === "synced", 197 })); 198 } 199 200 /** 201 * Invalidate the cached ReadableRepo for a DID (call after sync). 202 */ 203 invalidateCache(did: string): void { 204 this.cache.delete(did); 205 } 206}