import { produceRequirements, type XRPCProcedures, type XRPCQueries, } from "@cistern/shared"; import { decryptText, generateKeys } from "@cistern/crypto"; import { generateRandomName } from "@puregarlic/randimal"; import { is, isResourceUri, parse, type RecordKey, type ResourceUri, } from "@atcute/lexicons"; import { JetstreamSubscription } from "@atcute/jetstream"; import type { Did } from "@atcute/lexicons/syntax"; import type { Client } from "@atcute/client"; import { AppCisternMemo, type AppCisternPubkey } from "@cistern/lexicon"; import type { ConsumerOptions, ConsumerParams, DecryptedMemo, LocalKeyPair, } from "./types.ts"; /** * Client for generating keys and decoding Cistern memos. */ export class Consumer { /** DID of the user this consumer acts on behalf of */ did: Did; /** `@atcute/client` instance with credential manager */ rpc: Client; /** Private key used for decrypting and the AT URI of its associated public key */ keypair?: LocalKeyPair; constructor(params: ConsumerParams) { this.did = params.miniDoc.did; this.keypair = params.options.keypair ? { privateKey: Uint8Array.fromBase64(params.options.keypair.privateKey), publicKey: params.options.keypair.publicKey as ResourceUri, } : undefined; this.rpc = params.rpc; } /** * Generates a key pair, uploading the public key to PDS and returning the pair. */ async generateKeyPair(): Promise { if (this.keypair) { throw new Error("client already has a key pair"); } const keys = generateKeys(); const name = await generateRandomName(); const record: AppCisternPubkey.Main = { $type: "app.cistern.pubkey", name, algorithm: "x_wing", content: { $bytes: keys.publicKey.toBase64() }, createdAt: new Date().toISOString(), }; const res = await this.rpc.post("com.atproto.repo.createRecord", { input: { collection: "app.cistern.pubkey", repo: this.did, record, }, }); if (!res.ok) { throw new Error( `failed to save public key: ${res.status} ${res.data.error}`, ); } const keypair = { privateKey: keys.secretKey, publicKey: res.data.uri, }; this.keypair = keypair; return keypair; } /** * Asynchronously iterate through memos in the user's PDS */ async *listMemos(): AsyncGenerator< DecryptedMemo, void, undefined > { if (!this.keypair) { throw new Error("no key pair set; generate a key before listing memos"); } let cursor: string | undefined; while (true) { const res = await this.rpc.get("com.atproto.repo.listRecords", { params: { collection: "app.cistern.memo", repo: this.did, cursor, }, }); if (!res.ok) { throw new Error( `failed to list memos: ${res.status} ${res.data.error}`, ); } cursor = res.data.cursor; for (const record of res.data.records) { const memo = parse(AppCisternMemo.mainSchema, record.value); if (memo.pubkey !== this.keypair.publicKey) continue; const decrypted = decryptText(this.keypair.privateKey, { nonce: memo.nonce.$bytes, cipherText: memo.ciphertext.$bytes, content: memo.payload.$bytes, hash: memo.contentHash.$bytes, length: memo.contentLength, }); yield { key: record.uri.split("/").pop() as RecordKey, tid: memo.tid, text: decrypted, }; } if (!cursor) return; } } /** * Subscribes to the Jetstreams for the user's memos. Pass `"stop"` into `subscription.next(...)` to cancel * @todo Allow specifying Jetstream endpoint */ async *subscribeToMemos(): AsyncGenerator< DecryptedMemo, void, "stop" | undefined > { if (!this.keypair) { throw new Error("no key pair set; generate a key before subscribing"); } const subscription = new JetstreamSubscription({ url: "wss://jetstream2.us-east.bsky.network", wantedCollections: ["app.cistern.memo"], wantedDids: [this.did], }); for await (const event of subscription) { if (event.kind === "commit" && event.commit.operation === "create") { const record = event.commit.record; if (!is(AppCisternMemo.mainSchema, record)) { continue; } if (record.pubkey !== this.keypair.publicKey) { continue; } const decrypted = decryptText(this.keypair.privateKey, { nonce: record.nonce.$bytes, cipherText: record.ciphertext.$bytes, content: record.payload.$bytes, hash: record.contentHash.$bytes, length: record.contentLength, }); const command = yield { key: event.commit.rkey, tid: record.tid, text: decrypted, }; if (command === "stop") return; } } } /** * Deletes a memo from the user's PDS by record key. */ async deleteMemo(key: RecordKey) { const res = await this.rpc.post("com.atproto.repo.deleteRecord", { input: { collection: "app.cistern.memo", repo: this.did, rkey: key, }, }); if (!res.ok) { throw new Error( `failed to delete memo ${key}: ${res.status} ${res.data.error}`, ); } } } /** * Creates a `Consumer` instance with all necessary requirements. This is the recommended way to construct a `Consumer`. * * @description Resolves the user's DID using Slingshot, instantiates an `@atcute/client` instance, creates an initial session, and then returns a new Consumer. * @param {ConsumerOptions} options - Information for constructing the underlying XRPC client * @returns {Promise} A Cistern consumer client with an authorized session */ export async function createConsumer( options: ConsumerOptions, ): Promise { const reqs = await produceRequirements(options); if (options.keypair && !isResourceUri(options.keypair.publicKey)) { throw new Error("provided public key is not a valid AT URI"); } return new Consumer(reqs); }