import type Database from "better-sqlite3"; import { Repo, WriteOpAction, BlockMap, blocksToCarFile, readCarWithRoot, getRecords, type RecordCreateOp, type RecordUpdateOp, type RecordDeleteOp, type RecordWriteOp, } from "@atproto/repo"; type RepoRecord = Record; import { Secp256k1Keypair } from "@atproto/crypto"; import { CID, asCid, isBlobRef } from "@atproto/lex-data"; import { now as tidNow } from "@atcute/tid"; import { encode as cborEncode } from "./cbor-compat.js"; import { SqliteRepoStorage } from "./storage.js"; import { Sequencer, type SeqEvent, type SeqCommitEvent, type SeqIdentityEvent, type CommitData, } from "./sequencer.js"; import { BlobStore, type BlobRef } from "./blobs.js"; import { jsonToLex, type JsonValue } from "@atproto/lex-json"; import type { Config } from "./config.js"; import type { BlockStore, NetworkService } from "./ipfs.js"; /** * RepoManager - manages a single user's AT Protocol repository. * * This is the Node.js equivalent of Cirrus's AccountDurableObject, * converted from a Cloudflare Durable Object to a plain class. */ export class RepoManager { storage: SqliteRepoStorage; private repo: Repo | null = null; private keypair: Secp256k1Keypair | null = null; sequencer: Sequencer; blobStore: BlobStore | null = null; blockStore: BlockStore | null = null; networkService: NetworkService | null = null; private repoInitialized = false; /** Callback invoked when a firehose event is produced */ onFirehoseEvent?: (event: SeqEvent) => void; constructor( private db: Database.Database, private config: Config, ) { this.storage = new SqliteRepoStorage(db); this.sequencer = new Sequencer(db); } /** * Initialize storage schema and optionally the blob store. */ init( blobStore?: BlobStore, blockStore?: BlockStore, networkService?: NetworkService, ): void { this.storage.initSchema(true); if (blobStore) { this.blobStore = blobStore; } if (blockStore) { this.blockStore = blockStore; } if (networkService) { this.networkService = networkService; } } /** * Initialize the Repo instance. Called lazily on first repo access. */ private async ensureRepoInitialized(): Promise { if (this.repoInitialized) return; if (!this.config.DID || !this.config.SIGNING_KEY) { throw new Error("RepoManager requires DID and SIGNING_KEY to be configured"); } this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY); const root = await this.storage.getRoot(); if (root) { this.repo = await Repo.load(this.storage, root); } else { this.repo = await Repo.create( this.storage, this.config.DID, this.keypair, ); } this.repoInitialized = true; } async getRepo(): Promise { await this.ensureRepoInitialized(); return this.repo!; } async getKeypair(): Promise { await this.ensureRepoInitialized(); return this.keypair!; } async ensureActive(): Promise { const isActive = await this.storage.getActive(); if (!isActive) { throw new Error( "AccountDeactivated: Account is deactivated. Call activateAccount to enable writes.", ); } } /** * Get new blocks for the current revision from the database. */ private getNewBlocksForRev(rev: string): BlockMap { const newBlocks = new BlockMap(); const rows = this.db .prepare("SELECT cid, bytes FROM blocks WHERE rev = ?") .all(rev) as Array<{ cid: string; bytes: Buffer }>; for (const row of rows) { const cid = CID.parse(row.cid); const bytes = new Uint8Array(row.bytes); newBlocks.set(cid, bytes); } return newBlocks; } /** * Sequence a commit and broadcast to firehose listeners. */ private async sequenceAndBroadcast( prevRev: string, ops: Array, ): Promise { const newBlocks = this.getNewBlocksForRev(this.repo!.commit.rev); const commitData: CommitData = { did: this.repo!.did, commit: this.repo!.cid, rev: this.repo!.commit.rev, since: prevRev, newBlocks, ops, }; const event = await this.sequencer.sequenceCommit(commitData); this.onFirehoseEvent?.(event); // Fire-and-forget: push new blocks to IPFS (never blocks commit path) if (this.blockStore) { const store = this.blockStore; const net = this.networkService; store .putBlocks(newBlocks) .then(() => { if (!net) return; const cidStrs: string[] = []; const map = ( newBlocks as unknown as { map: Map; } ).map; if (map) { for (const cidStr of map.keys()) { cidStrs.push(cidStr); } } return net.provideBlocks(cidStrs).then(() => { net.publishCommitNotification( commitData.did, commitData.commit.toString(), commitData.rev, ).catch(() => {}); }); }) .catch(() => {}); } } // ============================================ // Repo Operations // ============================================ async describeRepo(): Promise<{ did: string; collections: string[]; cid: string; }> { const repo = await this.getRepo(); if (!this.storage.hasCollections() && (await this.storage.getRoot())) { const seen = new Set(); for await (const record of repo.walkRecords()) { if (!seen.has(record.collection)) { seen.add(record.collection); this.storage.addCollection(record.collection); } } } return { did: repo.did, collections: this.storage.getCollections(), cid: repo.cid.toString(), }; } async getRecord( collection: string, rkey: string, ): Promise<{ cid: string; record: unknown } | null> { const repo = await this.getRepo(); const dataKey = `${collection}/${rkey}`; const recordCid = await repo.data.get(dataKey); if (!recordCid) return null; const record = await repo.getRecord(collection, rkey); if (!record) return null; return { cid: recordCid.toString(), record: serializeRecord(record), }; } async listRecords( collection: string, opts: { limit: number; cursor?: string; reverse?: boolean }, ): Promise<{ records: Array<{ uri: string; cid: string; value: unknown }>; cursor?: string; }> { const repo = await this.getRepo(); const records = []; const startFrom = opts.cursor || `${collection}/`; for await (const record of repo.walkRecords(startFrom)) { if (record.collection !== collection) { if (records.length > 0) break; continue; } records.push({ uri: `at://${repo.did}/${record.collection}/${record.rkey}`, cid: record.cid.toString(), value: serializeRecord(record.record), }); if (records.length >= opts.limit + 1) break; } if (opts.reverse) { records.reverse(); } const hasMore = records.length > opts.limit; const results = hasMore ? records.slice(0, opts.limit) : records; const cursor = hasMore ? `${collection}/${results[results.length - 1]?.uri.split("/").pop() ?? ""}` : undefined; return { records: results, cursor }; } async createRecord( collection: string, rkey: string | undefined, record: unknown, ): Promise<{ uri: string; cid: string; commit: { cid: string; rev: string }; }> { await this.ensureActive(); const repo = await this.getRepo(); const keypair = await this.getKeypair(); const actualRkey = rkey || tidNow(); const createOp: RecordCreateOp = { action: WriteOpAction.Create, collection, rkey: actualRkey, record: jsonToLex(record as JsonValue) as RepoRecord, }; const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites([createOp], keypair); this.repo = updatedRepo; const dataKey = `${collection}/${actualRkey}`; const recordCid = await this.repo.data.get(dataKey); if (!recordCid) { throw new Error(`Failed to create record: ${collection}/${actualRkey}`); } this.storage.addCollection(collection); const opWithCid = { ...createOp, cid: recordCid }; await this.sequenceAndBroadcast(prevRev, [opWithCid]); return { uri: `at://${this.repo.did}/${collection}/${actualRkey}`, cid: recordCid.toString(), commit: { cid: this.repo.cid.toString(), rev: this.repo.commit.rev, }, }; } async deleteRecord( collection: string, rkey: string, ): Promise<{ commit: { cid: string; rev: string } } | null> { await this.ensureActive(); const repo = await this.getRepo(); const keypair = await this.getKeypair(); const existing = await repo.getRecord(collection, rkey); if (!existing) return null; const deleteOp: RecordDeleteOp = { action: WriteOpAction.Delete, collection, rkey, }; const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites([deleteOp], keypair); this.repo = updatedRepo; await this.sequenceAndBroadcast(prevRev, [deleteOp]); return { commit: { cid: updatedRepo.cid.toString(), rev: updatedRepo.commit.rev, }, }; } async putRecord( collection: string, rkey: string, record: unknown, ): Promise<{ uri: string; cid: string; commit: { cid: string; rev: string }; validationStatus: string; }> { await this.ensureActive(); const repo = await this.getRepo(); const keypair = await this.getKeypair(); const existing = await repo.getRecord(collection, rkey); const isUpdate = existing !== null; const normalizedRecord = jsonToLex(record as JsonValue) as RepoRecord; const op: RecordWriteOp = isUpdate ? ({ action: WriteOpAction.Update, collection, rkey, record: normalizedRecord, } as RecordUpdateOp) : ({ action: WriteOpAction.Create, collection, rkey, record: normalizedRecord, } as RecordCreateOp); const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites([op], keypair); this.repo = updatedRepo; const dataKey = `${collection}/${rkey}`; const recordCid = await this.repo.data.get(dataKey); if (!recordCid) { throw new Error(`Failed to put record: ${collection}/${rkey}`); } this.storage.addCollection(collection); const opWithCid = { ...op, cid: recordCid }; await this.sequenceAndBroadcast(prevRev, [opWithCid]); return { uri: `at://${this.repo.did}/${collection}/${rkey}`, cid: recordCid.toString(), commit: { cid: this.repo.cid.toString(), rev: this.repo.commit.rev, }, validationStatus: "valid", }; } async applyWrites( writes: Array<{ $type: string; collection: string; rkey?: string; value?: unknown; }>, ): Promise<{ commit: { cid: string; rev: string }; results: Array<{ $type: string; uri?: string; cid?: string; validationStatus?: string; }>; }> { await this.ensureActive(); const repo = await this.getRepo(); const keypair = await this.getKeypair(); const ops: RecordWriteOp[] = []; const results: Array<{ $type: string; collection: string; rkey: string; action: WriteOpAction; }> = []; for (const write of writes) { if (write.$type === "com.atproto.repo.applyWrites#create") { const rkey = write.rkey || tidNow(); const op: RecordCreateOp = { action: WriteOpAction.Create, collection: write.collection, rkey, record: jsonToLex(write.value as JsonValue) as RepoRecord, }; ops.push(op); results.push({ $type: "com.atproto.repo.applyWrites#createResult", collection: write.collection, rkey, action: WriteOpAction.Create, }); } else if (write.$type === "com.atproto.repo.applyWrites#update") { if (!write.rkey) throw new Error("Update requires rkey"); const op: RecordUpdateOp = { action: WriteOpAction.Update, collection: write.collection, rkey: write.rkey, record: jsonToLex(write.value as JsonValue) as RepoRecord, }; ops.push(op); results.push({ $type: "com.atproto.repo.applyWrites#updateResult", collection: write.collection, rkey: write.rkey, action: WriteOpAction.Update, }); } else if (write.$type === "com.atproto.repo.applyWrites#delete") { if (!write.rkey) throw new Error("Delete requires rkey"); const op: RecordDeleteOp = { action: WriteOpAction.Delete, collection: write.collection, rkey: write.rkey, }; ops.push(op); results.push({ $type: "com.atproto.repo.applyWrites#deleteResult", collection: write.collection, rkey: write.rkey, action: WriteOpAction.Delete, }); } else { throw new Error(`Unknown write type: ${write.$type}`); } } const prevRev = repo.commit.rev; const updatedRepo = await repo.applyWrites(ops, keypair); this.repo = updatedRepo; for (const op of ops) { if (op.action !== WriteOpAction.Delete) { this.storage.addCollection(op.collection); } } const finalResults: Array<{ $type: string; uri?: string; cid?: string; validationStatus?: string; }> = []; const opsWithCids: Array = []; for (let i = 0; i < results.length; i++) { const result = results[i]!; const op = ops[i]!; if (result.action === WriteOpAction.Delete) { finalResults.push({ $type: result.$type }); opsWithCids.push(op); } else { const dataKey = `${result.collection}/${result.rkey}`; const recordCid = await this.repo.data.get(dataKey); finalResults.push({ $type: result.$type, uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`, cid: recordCid?.toString(), validationStatus: "valid", }); opsWithCids.push({ ...op, cid: recordCid }); } } await this.sequenceAndBroadcast(prevRev, opsWithCids); return { commit: { cid: this.repo.cid.toString(), rev: this.repo.commit.rev, }, results: finalResults, }; } // ============================================ // Repo Export/Import // ============================================ async getRepoStatus(): Promise<{ did: string; head: string; rev: string; }> { const repo = await this.getRepo(); return { did: repo.did, head: repo.cid.toString(), rev: repo.commit.rev, }; } async getRepoCar(): Promise { const root = await this.storage.getRoot(); if (!root) throw new Error("No repository root found"); const rows = this.db .prepare("SELECT cid, bytes FROM blocks") .all() as Array<{ cid: string; bytes: Buffer }>; const blocks = new BlockMap(); for (const row of rows) { const cid = CID.parse(row.cid); const bytes = new Uint8Array(row.bytes); blocks.set(cid, bytes); } return blocksToCarFile(root, blocks); } async getBlocks(cids: string[]): Promise { const root = await this.storage.getRoot(); if (!root) throw new Error("No repository root found"); const blocks = new BlockMap(); for (const cidStr of cids) { const cid = CID.parse(cidStr); const bytes = await this.storage.getBytes(cid); if (bytes) { blocks.set(cid, bytes); } } return blocksToCarFile(root, blocks); } async getRecordProof( collection: string, rkey: string, ): Promise { const root = await this.storage.getRoot(); if (!root) throw new Error("No repository root found"); const carChunks: Uint8Array[] = []; for await (const chunk of getRecords(this.storage, root, [ { collection, rkey }, ])) { carChunks.push(chunk); } const totalLength = carChunks.reduce((acc, chunk) => acc + chunk.length, 0); const result = new Uint8Array(totalLength); let offset = 0; for (const chunk of carChunks) { result.set(chunk, offset); offset += chunk.length; } return result; } async importRepo(carBytes: Uint8Array): Promise<{ did: string; rev: string; cid: string; }> { const isActive = await this.storage.getActive(); const existingRoot = await this.storage.getRoot(); if (isActive && existingRoot) { throw new Error( "Repository already exists. Cannot import over existing repository.", ); } if (existingRoot) { await this.storage.destroy(); this.repo = null; this.repoInitialized = false; } const { root: rootCid, blocks } = await readCarWithRoot(carBytes); const importRev = tidNow(); await this.storage.putMany(blocks, importRev); if (!this.config.DID || !this.config.SIGNING_KEY) { throw new Error("RepoManager requires DID and SIGNING_KEY for import"); } this.keypair = await Secp256k1Keypair.import(this.config.SIGNING_KEY); this.repo = await Repo.load(this.storage, rootCid); await this.storage.updateRoot(rootCid, this.repo.commit.rev); if (this.repo.did !== this.config.DID) { await this.storage.destroy(); throw new Error( `DID mismatch: CAR file contains DID ${this.repo.did}, but expected ${this.config.DID}`, ); } this.repoInitialized = true; const seenCollections = new Set(); for await (const record of this.repo.walkRecords()) { if (!seenCollections.has(record.collection)) { seenCollections.add(record.collection); this.storage.addCollection(record.collection); } const blobCids = extractBlobCids(record.record); if (blobCids.length > 0) { const uri = `at://${this.repo.did}/${record.collection}/${record.rkey}`; this.storage.addRecordBlobs(uri, blobCids); } } return { did: this.repo.did, rev: this.repo.commit.rev, cid: rootCid.toString(), }; } // ============================================ // Blob Operations // ============================================ async uploadBlob(bytes: Uint8Array, mimeType: string): Promise { if (!this.blobStore) { throw new Error("Blob storage not configured"); } const MAX_BLOB_SIZE = 60 * 1024 * 1024; if (bytes.length > MAX_BLOB_SIZE) { throw new Error( `Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`, ); } const blobRef = await this.blobStore.putBlob(bytes, mimeType); this.storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType); // Fire-and-forget: push blob to IPFS if (this.blockStore) { this.blockStore .putBlock(blobRef.ref.$link, bytes) .catch(() => {}); } return blobRef; } // ============================================ // Preferences // ============================================ async getPreferences(): Promise<{ preferences: unknown[] }> { const preferences = await this.storage.getPreferences(); return { preferences }; } async putPreferences(preferences: unknown[]): Promise { await this.storage.putPreferences(preferences); } // ============================================ // Account State // ============================================ async getEmail(): Promise<{ email: string | null }> { return { email: this.storage.getEmail() }; } async updateEmail(email: string): Promise { this.storage.setEmail(email); } async getActive(): Promise { return this.storage.getActive(); } async activateAccount(): Promise { await this.storage.setActive(true); } async deactivateAccount(): Promise { await this.storage.setActive(false); } // ============================================ // Migration Progress // ============================================ async countBlocks(): Promise { return this.storage.countBlocks(); } async countRecords(): Promise { const repo = await this.getRepo(); let count = 0; for await (const _record of repo.walkRecords()) { count++; } return count; } async countExpectedBlobs(): Promise { return this.storage.countExpectedBlobs(); } async countImportedBlobs(): Promise { return this.storage.countImportedBlobs(); } async listMissingBlobs( limit: number = 500, cursor?: string, ): Promise<{ blobs: Array<{ cid: string; recordUri: string }>; cursor?: string; }> { return this.storage.listMissingBlobs(limit, cursor); } async resetMigration(): Promise<{ blocksDeleted: number; blobsCleared: number; }> { const isActive = await this.storage.getActive(); if (isActive) { throw new Error( "AccountActive: Cannot reset migration on an active account. Deactivate first.", ); } const blocksDeleted = await this.storage.countBlocks(); const blobsCleared = this.storage.countImportedBlobs(); await this.storage.destroy(); this.storage.clearBlobTracking(); this.repo = null; this.repoInitialized = false; return { blocksDeleted, blobsCleared }; } // ============================================ // Identity Events // ============================================ async emitIdentityEvent(handle: string): Promise<{ seq: number }> { const time = new Date().toISOString(); const result = this.db .prepare( `INSERT INTO firehose_events (event_type, payload) VALUES ('identity', ?)`, ) .run(Buffer.alloc(0)); const seq = Number(result.lastInsertRowid); const event: SeqIdentityEvent = { seq, type: "identity", event: { seq, did: this.config.DID ?? "", time, handle, }, time, }; this.onFirehoseEvent?.(event); return { seq }; } // ============================================ // Health // ============================================ healthCheck(): { ok: true } { this.db.prepare("SELECT 1").get(); return { ok: true }; } getFirehoseStatus(subscriberCount: number): { subscribers: number; latestSeq: number | null; } { const seq = this.sequencer.getLatestSeq(); return { subscribers: subscriberCount, latestSeq: seq || null, }; } } // ============================================ // Utility Functions // ============================================ /** * Serialize a record for JSON by converting CID objects to { $link: "..." } format. */ function serializeRecord(obj: unknown): unknown { if (obj === null || obj === undefined) return obj; const cid = asCid(obj); if (cid) { return { $link: cid.toString() }; } if (obj instanceof Uint8Array) { let binary = ""; for (let i = 0; i < obj.length; i++) { binary += String.fromCharCode(obj[i]!); } return { $bytes: btoa(binary) }; } if (Array.isArray(obj)) { return obj.map(serializeRecord); } if (typeof obj === "object") { const result: Record = {}; for (const [key, value] of Object.entries(obj)) { result[key] = serializeRecord(value); } return result; } return obj; } /** * Extract blob CIDs from a record by recursively searching for blob references. */ export function extractBlobCids(obj: unknown): string[] { const cids: string[] = []; function walk(value: unknown): void { if (value === null || value === undefined) return; if (isBlobRef(value)) { cids.push(value.ref.toString()); return; } if (Array.isArray(value)) { for (const item of value) { walk(item); } } else if (typeof value === "object") { for (const key of Object.keys(value as Record)) { walk((value as Record)[key]); } } } walk(obj); return cids; }