atproto user agency toolkit for individuals and groups
1/**
2 * Service that loads and caches ReadableRepo instances for replicated DIDs.
3 * Provides read-only access to replicated repo data via IPFS blocks.
4 */
5
6import { CID } from "multiformats";
7import { ReadableRepo } from "@atproto/repo/dist/readable-repo.js";
8import type { BlockStore } from "../ipfs.js";
9import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js";
10import type { SyncStorage } from "./sync-storage.js";
11
12export class ReplicatedRepoReader {
13 private cache = new Map<string, ReadableRepo>();
14 private readableBlockstore: IpfsReadableBlockstore;
15
16 constructor(
17 blockStore: BlockStore,
18 private syncStorage: SyncStorage,
19 ) {
20 this.readableBlockstore = new IpfsReadableBlockstore(blockStore);
21 }
22
23 /**
24 * Check if a DID is tracked for replication.
25 */
26 isReplicatedDid(did: string): boolean {
27 const state = this.syncStorage.getState(did);
28 return state !== null;
29 }
30
31 /**
32 * Load a ReadableRepo for a replicated DID (cached until invalidated).
33 */
34 async getRepo(did: string): Promise<ReadableRepo | null> {
35 const cached = this.cache.get(did);
36 if (cached) return cached;
37
38 const state = this.syncStorage.getState(did);
39 if (!state?.rootCid) return null;
40
41 try {
42 const commitCid = CID.parse(state.rootCid);
43 const repo = await ReadableRepo.load(
44 this.readableBlockstore,
45 commitCid,
46 );
47 this.cache.set(did, repo);
48 return repo;
49 } catch {
50 return null;
51 }
52 }
53
54 /**
55 * Get a single record from a replicated repo.
56 */
57 async getRecord(
58 did: string,
59 collection: string,
60 rkey: string,
61 ): Promise<{ cid: string; value: unknown } | null> {
62 const repo = await this.getRepo(did);
63 if (!repo) return null;
64
65 // Get the record CID from the MST
66 const key = `${collection}/${rkey}`;
67 const recordCid = await repo.data.get(key);
68 if (!recordCid) return null;
69
70 // Read the record value
71 const value = await repo.getRecord(collection, rkey);
72 if (value === null) return null;
73
74 return { cid: recordCid.toString(), value };
75 }
76
77 /**
78 * List records from a collection in a replicated repo.
79 */
80 async listRecords(
81 did: string,
82 collection: string,
83 opts: { limit?: number; cursor?: string; reverse?: boolean } = {},
84 ): Promise<{
85 records: Array<{ uri: string; cid: string; value: unknown }>;
86 cursor?: string;
87 }> {
88 const repo = await this.getRepo(did);
89 if (!repo) return { records: [] };
90
91 const limit = opts.limit ?? 50;
92 const prefix = `${collection}/`;
93 const records: Array<{ uri: string; cid: string; value: unknown }> = [];
94
95 // Use walkRecords with a starting key for cursor support
96 const startKey = opts.cursor
97 ? `${collection}/${opts.cursor}`
98 : collection;
99
100 // Collect matching records via MST list
101 // MST.list(count, after, before) returns Leaf[] sorted by key
102 const leaves = opts.reverse
103 ? await repo.data.list(undefined, undefined, `${collection}0`)
104 : await repo.data.list(
105 undefined,
106 opts.cursor ? `${collection}/${opts.cursor}` : undefined,
107 );
108
109 for (const leaf of leaves) {
110 if (records.length >= limit) break;
111 if (!leaf.key.startsWith(prefix)) {
112 // If we've passed the collection prefix, stop
113 if (leaf.key > prefix + "\xff") break;
114 continue;
115 }
116
117 const rkey = leaf.key.slice(prefix.length);
118 const value = await this.readableBlockstore.attemptReadRecord(
119 leaf.value,
120 );
121 if (value !== null) {
122 records.push({
123 uri: `at://${did}/${collection}/${rkey}`,
124 cid: leaf.value.toString(),
125 value,
126 });
127 }
128 }
129
130 if (opts.reverse) {
131 records.reverse();
132 }
133
134 const result: {
135 records: Array<{ uri: string; cid: string; value: unknown }>;
136 cursor?: string;
137 } = { records };
138
139 if (records.length === limit) {
140 const lastRecord = records[records.length - 1]!;
141 const lastUri = lastRecord.uri;
142 // cursor is the rkey of the last record
143 result.cursor = lastUri.split("/").pop()!;
144 }
145
146 return result;
147 }
148
149 /**
150 * Describe a replicated repo (list collections).
151 */
152 async describeRepo(
153 did: string,
154 ): Promise<{ did: string; collections: string[] } | null> {
155 const repo = await this.getRepo(did);
156 if (!repo) return null;
157
158 const collections = new Set<string>();
159 for await (const entry of repo.walkRecords()) {
160 collections.add(entry.collection);
161 }
162
163 return { did, collections: Array.from(collections).sort() };
164 }
165
166 /**
167 * Get repo status for a replicated DID.
168 */
169 getRepoStatus(
170 did: string,
171 ): { did: string; rev: string | null; rootCid: string | null; active: boolean } | null {
172 const state = this.syncStorage.getState(did);
173 if (!state) return null;
174
175 return {
176 did,
177 rev: state.lastSyncRev,
178 rootCid: state.rootCid,
179 active: state.status === "synced",
180 };
181 }
182
183 /**
184 * Get repo statuses for all replicated DIDs.
185 */
186 getAllRepoStatuses(): Array<{
187 did: string;
188 rev: string | null;
189 rootCid: string | null;
190 active: boolean;
191 }> {
192 return this.syncStorage.getAllStates().map((state) => ({
193 did: state.did,
194 rev: state.lastSyncRev,
195 rootCid: state.rootCid,
196 active: state.status === "synced",
197 }));
198 }
199
200 /**
201 * Invalidate the cached ReadableRepo for a DID (call after sync).
202 */
203 invalidateCache(did: string): void {
204 this.cache.delete(did);
205 }
206}