open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents
atprotocol
pds
cloudflare
1import { CID } from "@atproto/lex-data";
2import {
3 BlockMap,
4 ReadableBlockstore,
5 cborToLex,
6 type CommitData,
7 type RepoStorage,
8} from "@atproto/repo";
9
10export interface AccountState {
11 did: string;
12 handle: string;
13 signing_key_hex: string;
14 signing_key_pub: string;
15 rotation_key_hex: string;
16 rotation_key_pub: string;
17 jwk_thumbprint: string | null;
18 root_cid: string | null;
19 rev: string | null;
20 prev_data_cid: string | null;
21 active: number;
22 created_at: string;
23}
24
25export class SqliteRepoStorage
26 extends ReadableBlockstore
27 implements RepoStorage
28{
29 lastCommit: CommitData | null = null;
30
31 constructor(private sql: SqlStorage) {
32 super();
33 }
34
35 /**
36 * Initialize the database schema. Called once on DO startup.
37 */
38 initSchema(): void {
39 this.sql.exec(`
40 CREATE TABLE IF NOT EXISTS blocks (
41 cid TEXT PRIMARY KEY,
42 bytes BLOB NOT NULL,
43 rev TEXT NOT NULL
44 );
45
46 CREATE INDEX IF NOT EXISTS idx_blocks_rev ON blocks(rev);
47
48 CREATE TABLE IF NOT EXISTS repo_state (
49 id INTEGER PRIMARY KEY CHECK (id = 1),
50 did TEXT,
51 handle TEXT,
52 signing_key_hex TEXT,
53 signing_key_pub TEXT,
54 rotation_key_hex TEXT,
55 rotation_key_pub TEXT,
56 jwk_thumbprint TEXT,
57 root_cid TEXT,
58 rev TEXT,
59 prev_data_cid TEXT,
60 active INTEGER NOT NULL DEFAULT 1,
61 created_at TEXT NOT NULL DEFAULT (datetime('now'))
62 );
63
64 INSERT OR IGNORE INTO repo_state (id) VALUES (1);
65
66 CREATE TABLE IF NOT EXISTS collections (
67 collection TEXT PRIMARY KEY
68 );
69
70 CREATE TABLE IF NOT EXISTS blobs (
71 cid TEXT PRIMARY KEY,
72 mime_type TEXT NOT NULL,
73 size INTEGER NOT NULL,
74 created_at TEXT NOT NULL DEFAULT (datetime('now'))
75 );
76 `);
77 }
78
79 /**
80 * Set account-specific state in repo_state. Called during account provisioning.
81 */
82 initAccountState(opts: {
83 did: string;
84 handle: string;
85 signing_key_hex: string;
86 signing_key_pub: string;
87 rotation_key_hex: string;
88 rotation_key_pub: string;
89 jwk_thumbprint: string | null;
90 }): void {
91 this.sql.exec(
92 `UPDATE repo_state SET
93 did = ?, handle = ?,
94 signing_key_hex = ?, signing_key_pub = ?,
95 rotation_key_hex = ?, rotation_key_pub = ?,
96 jwk_thumbprint = ?
97 WHERE id = 1`,
98 opts.did,
99 opts.handle,
100 opts.signing_key_hex,
101 opts.signing_key_pub,
102 opts.rotation_key_hex,
103 opts.rotation_key_pub,
104 opts.jwk_thumbprint,
105 );
106 }
107
108 /**
109 * Get the full account state from repo_state.
110 */
111 getState(): AccountState | null {
112 const rows = this.sql.exec("SELECT * FROM repo_state WHERE id = 1").toArray();
113 if (rows.length === 0) return null;
114 return rows[0] as unknown as AccountState;
115 }
116
117 async getRoot(): Promise<CID | null> {
118 const rows = this.sql
119 .exec("SELECT root_cid FROM repo_state WHERE id = 1")
120 .toArray();
121 if (rows.length === 0 || !rows[0]?.root_cid) return null;
122 return CID.parse(rows[0]!.root_cid as string);
123 }
124
125 async getRev(): Promise<string | null> {
126 const rows = this.sql.exec("SELECT rev FROM repo_state WHERE id = 1").toArray();
127 return rows.length > 0 ? ((rows[0]!.rev as string) ?? null) : null;
128 }
129
130 async getBytes(cid: CID): Promise<Uint8Array | null> {
131 const rows = this.sql
132 .exec("SELECT bytes FROM blocks WHERE cid = ?", cid.toString())
133 .toArray();
134 if (rows.length === 0 || !rows[0]?.bytes) return null;
135 // DO SQLite returns ArrayBuffer for BLOB columns
136 return new Uint8Array(rows[0]!.bytes as ArrayBuffer);
137 }
138
139 async has(cid: CID): Promise<boolean> {
140 const rows = this.sql
141 .exec("SELECT 1 FROM blocks WHERE cid = ? LIMIT 1", cid.toString())
142 .toArray();
143 return rows.length > 0;
144 }
145
146 async getBlocks(cids: CID[]): Promise<{ blocks: BlockMap; missing: CID[] }> {
147 const blocks = new BlockMap();
148 const missing: CID[] = [];
149 for (const cid of cids) {
150 const bytes = await this.getBytes(cid);
151 if (bytes) {
152 blocks.set(cid, bytes);
153 } else {
154 missing.push(cid);
155 }
156 }
157 return { blocks, missing };
158 }
159
160 async putBlock(cid: CID, block: Uint8Array, rev: string): Promise<void> {
161 this.sql.exec(
162 "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)",
163 cid.toString(),
164 block,
165 rev,
166 );
167 }
168
169 async putMany(blocks: BlockMap, rev: string): Promise<void> {
170 // Access BlockMap's internal map to avoid iterator issues in Workers
171 const internalMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map;
172 if (internalMap) {
173 for (const [cidStr, bytes] of internalMap) {
174 this.sql.exec(
175 "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)",
176 cidStr,
177 bytes,
178 rev,
179 );
180 }
181 }
182 }
183
184 async updateRoot(cid: CID, rev: string): Promise<void> {
185 this.sql.exec(
186 "UPDATE repo_state SET root_cid = ?, rev = ? WHERE id = 1",
187 cid.toString(),
188 rev,
189 );
190 }
191
192 async applyCommit(commit: CommitData): Promise<void> {
193 this.lastCommit = commit;
194
195 // Insert new blocks - access BlockMap's internal map for Workers compat
196 const internalMap = (
197 commit.newBlocks as unknown as { map: Map<string, Uint8Array> }
198 ).map;
199 if (internalMap) {
200 for (const [cidStr, bytes] of internalMap) {
201 this.sql.exec(
202 "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)",
203 cidStr,
204 bytes,
205 commit.rev,
206 );
207 }
208 }
209
210 // Remove old blocks - access CidSet's internal set for Workers compat
211 const removedSet = (commit.removedCids as unknown as { set: Set<string> }).set;
212 if (removedSet) {
213 for (const cidStr of removedSet) {
214 this.sql.exec("DELETE FROM blocks WHERE cid = ?", cidStr);
215 }
216 }
217
218 // Update root
219 // NOTE: no await between block inserts and root update - DO write coalescing
220 this.sql.exec(
221 "UPDATE repo_state SET root_cid = ?, rev = ? WHERE id = 1",
222 commit.cid.toString(),
223 commit.rev,
224 );
225
226 // Extract and store prev_data_cid from the commit block
227 const commitBytes = internalMap?.get(commit.cid.toString());
228 if (commitBytes) {
229 const commitObj = cborToLex(commitBytes) as { data: CID };
230 if (commitObj.data) {
231 this.sql.exec(
232 "UPDATE repo_state SET prev_data_cid = ? WHERE id = 1",
233 commitObj.data.toString(),
234 );
235 }
236 }
237 }
238
239 addCollection(collection: string): void {
240 this.sql.exec(
241 "INSERT OR IGNORE INTO collections (collection) VALUES (?)",
242 collection,
243 );
244 }
245
246 getCollections(): string[] {
247 const rows = this.sql
248 .exec("SELECT collection FROM collections ORDER BY collection")
249 .toArray();
250 return rows.map((row) => row.collection as string);
251 }
252
253 insertBlob(cid: string, mimeType: string, size: number): void {
254 this.sql.exec(
255 "INSERT OR IGNORE INTO blobs (cid, mime_type, size) VALUES (?, ?, ?)",
256 cid,
257 mimeType,
258 size,
259 );
260 }
261
262 listBlobs(opts?: { limit?: number; cursor?: string }): { cids: string[]; cursor?: string } {
263 const limit = opts?.limit ?? 500;
264 let rows: Array<{ cid: string }>;
265 if (opts?.cursor) {
266 rows = this.sql
267 .exec("SELECT cid FROM blobs WHERE cid > ? ORDER BY cid ASC LIMIT ?", opts.cursor, limit + 1)
268 .toArray() as Array<{ cid: string }>;
269 } else {
270 rows = this.sql
271 .exec("SELECT cid FROM blobs ORDER BY cid ASC LIMIT ?", limit + 1)
272 .toArray() as Array<{ cid: string }>;
273 }
274 const hasMore = rows.length > limit;
275 const results = hasMore ? rows.slice(0, limit) : rows;
276 return {
277 cids: results.map((r) => r.cid),
278 cursor: hasMore ? results[results.length - 1].cid : undefined,
279 };
280 }
281
282 /**
283 * Get all blocks (used for CAR export).
284 */
285 getAllBlocks(): Array<{ cid: string; bytes: ArrayBuffer }> {
286 return this.sql
287 .exec("SELECT cid, bytes FROM blocks")
288 .toArray() as Array<{ cid: string; bytes: ArrayBuffer }>;
289 }
290
291 /**
292 * Count blocks (for testing).
293 */
294 async countBlocks(): Promise<number> {
295 const rows = this.sql.exec("SELECT COUNT(*) as count FROM blocks").toArray();
296 return rows.length > 0 ? ((rows[0]!.count as number) ?? 0) : 0;
297 }
298
299 /**
300 * Clear all data (for testing).
301 */
302 async destroy(): Promise<void> {
303 this.sql.exec("DELETE FROM blocks");
304 this.sql.exec(
305 "UPDATE repo_state SET root_cid = NULL, rev = NULL WHERE id = 1",
306 );
307 }
308}