open-source, lexicon-agnostic PDS for AI agents. welcome-mat enrollment, AT Proto federation.
agents atprotocol pds cloudflare
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 308 lines 8.6 kB view raw
1import { CID } from "@atproto/lex-data"; 2import { 3 BlockMap, 4 ReadableBlockstore, 5 cborToLex, 6 type CommitData, 7 type RepoStorage, 8} from "@atproto/repo"; 9 10export interface AccountState { 11 did: string; 12 handle: string; 13 signing_key_hex: string; 14 signing_key_pub: string; 15 rotation_key_hex: string; 16 rotation_key_pub: string; 17 jwk_thumbprint: string | null; 18 root_cid: string | null; 19 rev: string | null; 20 prev_data_cid: string | null; 21 active: number; 22 created_at: string; 23} 24 25export class SqliteRepoStorage 26 extends ReadableBlockstore 27 implements RepoStorage 28{ 29 lastCommit: CommitData | null = null; 30 31 constructor(private sql: SqlStorage) { 32 super(); 33 } 34 35 /** 36 * Initialize the database schema. Called once on DO startup. 37 */ 38 initSchema(): void { 39 this.sql.exec(` 40 CREATE TABLE IF NOT EXISTS blocks ( 41 cid TEXT PRIMARY KEY, 42 bytes BLOB NOT NULL, 43 rev TEXT NOT NULL 44 ); 45 46 CREATE INDEX IF NOT EXISTS idx_blocks_rev ON blocks(rev); 47 48 CREATE TABLE IF NOT EXISTS repo_state ( 49 id INTEGER PRIMARY KEY CHECK (id = 1), 50 did TEXT, 51 handle TEXT, 52 signing_key_hex TEXT, 53 signing_key_pub TEXT, 54 rotation_key_hex TEXT, 55 rotation_key_pub TEXT, 56 jwk_thumbprint TEXT, 57 root_cid TEXT, 58 rev TEXT, 59 prev_data_cid TEXT, 60 active INTEGER NOT NULL DEFAULT 1, 61 created_at TEXT NOT NULL DEFAULT (datetime('now')) 62 ); 63 64 INSERT OR IGNORE INTO repo_state (id) VALUES (1); 65 66 CREATE TABLE IF NOT EXISTS collections ( 67 collection TEXT PRIMARY KEY 68 ); 69 70 CREATE TABLE IF NOT EXISTS blobs ( 71 cid TEXT PRIMARY KEY, 72 mime_type TEXT NOT NULL, 73 size INTEGER NOT NULL, 74 created_at TEXT NOT NULL DEFAULT (datetime('now')) 75 ); 76 `); 77 } 78 79 /** 80 * Set account-specific state in repo_state. Called during account provisioning. 81 */ 82 initAccountState(opts: { 83 did: string; 84 handle: string; 85 signing_key_hex: string; 86 signing_key_pub: string; 87 rotation_key_hex: string; 88 rotation_key_pub: string; 89 jwk_thumbprint: string | null; 90 }): void { 91 this.sql.exec( 92 `UPDATE repo_state SET 93 did = ?, handle = ?, 94 signing_key_hex = ?, signing_key_pub = ?, 95 rotation_key_hex = ?, rotation_key_pub = ?, 96 jwk_thumbprint = ? 97 WHERE id = 1`, 98 opts.did, 99 opts.handle, 100 opts.signing_key_hex, 101 opts.signing_key_pub, 102 opts.rotation_key_hex, 103 opts.rotation_key_pub, 104 opts.jwk_thumbprint, 105 ); 106 } 107 108 /** 109 * Get the full account state from repo_state. 110 */ 111 getState(): AccountState | null { 112 const rows = this.sql.exec("SELECT * FROM repo_state WHERE id = 1").toArray(); 113 if (rows.length === 0) return null; 114 return rows[0] as unknown as AccountState; 115 } 116 117 async getRoot(): Promise<CID | null> { 118 const rows = this.sql 119 .exec("SELECT root_cid FROM repo_state WHERE id = 1") 120 .toArray(); 121 if (rows.length === 0 || !rows[0]?.root_cid) return null; 122 return CID.parse(rows[0]!.root_cid as string); 123 } 124 125 async getRev(): Promise<string | null> { 126 const rows = this.sql.exec("SELECT rev FROM repo_state WHERE id = 1").toArray(); 127 return rows.length > 0 ? ((rows[0]!.rev as string) ?? null) : null; 128 } 129 130 async getBytes(cid: CID): Promise<Uint8Array | null> { 131 const rows = this.sql 132 .exec("SELECT bytes FROM blocks WHERE cid = ?", cid.toString()) 133 .toArray(); 134 if (rows.length === 0 || !rows[0]?.bytes) return null; 135 // DO SQLite returns ArrayBuffer for BLOB columns 136 return new Uint8Array(rows[0]!.bytes as ArrayBuffer); 137 } 138 139 async has(cid: CID): Promise<boolean> { 140 const rows = this.sql 141 .exec("SELECT 1 FROM blocks WHERE cid = ? LIMIT 1", cid.toString()) 142 .toArray(); 143 return rows.length > 0; 144 } 145 146 async getBlocks(cids: CID[]): Promise<{ blocks: BlockMap; missing: CID[] }> { 147 const blocks = new BlockMap(); 148 const missing: CID[] = []; 149 for (const cid of cids) { 150 const bytes = await this.getBytes(cid); 151 if (bytes) { 152 blocks.set(cid, bytes); 153 } else { 154 missing.push(cid); 155 } 156 } 157 return { blocks, missing }; 158 } 159 160 async putBlock(cid: CID, block: Uint8Array, rev: string): Promise<void> { 161 this.sql.exec( 162 "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)", 163 cid.toString(), 164 block, 165 rev, 166 ); 167 } 168 169 async putMany(blocks: BlockMap, rev: string): Promise<void> { 170 // Access BlockMap's internal map to avoid iterator issues in Workers 171 const internalMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map; 172 if (internalMap) { 173 for (const [cidStr, bytes] of internalMap) { 174 this.sql.exec( 175 "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)", 176 cidStr, 177 bytes, 178 rev, 179 ); 180 } 181 } 182 } 183 184 async updateRoot(cid: CID, rev: string): Promise<void> { 185 this.sql.exec( 186 "UPDATE repo_state SET root_cid = ?, rev = ? WHERE id = 1", 187 cid.toString(), 188 rev, 189 ); 190 } 191 192 async applyCommit(commit: CommitData): Promise<void> { 193 this.lastCommit = commit; 194 195 // Insert new blocks - access BlockMap's internal map for Workers compat 196 const internalMap = ( 197 commit.newBlocks as unknown as { map: Map<string, Uint8Array> } 198 ).map; 199 if (internalMap) { 200 for (const [cidStr, bytes] of internalMap) { 201 this.sql.exec( 202 "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)", 203 cidStr, 204 bytes, 205 commit.rev, 206 ); 207 } 208 } 209 210 // Remove old blocks - access CidSet's internal set for Workers compat 211 const removedSet = (commit.removedCids as unknown as { set: Set<string> }).set; 212 if (removedSet) { 213 for (const cidStr of removedSet) { 214 this.sql.exec("DELETE FROM blocks WHERE cid = ?", cidStr); 215 } 216 } 217 218 // Update root 219 // NOTE: no await between block inserts and root update - DO write coalescing 220 this.sql.exec( 221 "UPDATE repo_state SET root_cid = ?, rev = ? WHERE id = 1", 222 commit.cid.toString(), 223 commit.rev, 224 ); 225 226 // Extract and store prev_data_cid from the commit block 227 const commitBytes = internalMap?.get(commit.cid.toString()); 228 if (commitBytes) { 229 const commitObj = cborToLex(commitBytes) as { data: CID }; 230 if (commitObj.data) { 231 this.sql.exec( 232 "UPDATE repo_state SET prev_data_cid = ? WHERE id = 1", 233 commitObj.data.toString(), 234 ); 235 } 236 } 237 } 238 239 addCollection(collection: string): void { 240 this.sql.exec( 241 "INSERT OR IGNORE INTO collections (collection) VALUES (?)", 242 collection, 243 ); 244 } 245 246 getCollections(): string[] { 247 const rows = this.sql 248 .exec("SELECT collection FROM collections ORDER BY collection") 249 .toArray(); 250 return rows.map((row) => row.collection as string); 251 } 252 253 insertBlob(cid: string, mimeType: string, size: number): void { 254 this.sql.exec( 255 "INSERT OR IGNORE INTO blobs (cid, mime_type, size) VALUES (?, ?, ?)", 256 cid, 257 mimeType, 258 size, 259 ); 260 } 261 262 listBlobs(opts?: { limit?: number; cursor?: string }): { cids: string[]; cursor?: string } { 263 const limit = opts?.limit ?? 500; 264 let rows: Array<{ cid: string }>; 265 if (opts?.cursor) { 266 rows = this.sql 267 .exec("SELECT cid FROM blobs WHERE cid > ? ORDER BY cid ASC LIMIT ?", opts.cursor, limit + 1) 268 .toArray() as Array<{ cid: string }>; 269 } else { 270 rows = this.sql 271 .exec("SELECT cid FROM blobs ORDER BY cid ASC LIMIT ?", limit + 1) 272 .toArray() as Array<{ cid: string }>; 273 } 274 const hasMore = rows.length > limit; 275 const results = hasMore ? rows.slice(0, limit) : rows; 276 return { 277 cids: results.map((r) => r.cid), 278 cursor: hasMore ? results[results.length - 1].cid : undefined, 279 }; 280 } 281 282 /** 283 * Get all blocks (used for CAR export). 284 */ 285 getAllBlocks(): Array<{ cid: string; bytes: ArrayBuffer }> { 286 return this.sql 287 .exec("SELECT cid, bytes FROM blocks") 288 .toArray() as Array<{ cid: string; bytes: ArrayBuffer }>; 289 } 290 291 /** 292 * Count blocks (for testing). 293 */ 294 async countBlocks(): Promise<number> { 295 const rows = this.sql.exec("SELECT COUNT(*) as count FROM blocks").toArray(); 296 return rows.length > 0 ? ((rows[0]!.count as number) ?? 0) : 0; 297 } 298 299 /** 300 * Clear all data (for testing). 301 */ 302 async destroy(): Promise<void> { 303 this.sql.exec("DELETE FROM blocks"); 304 this.sql.exec( 305 "UPDATE repo_state SET root_cid = NULL, rev = NULL WHERE id = 1", 306 ); 307 } 308}