Encrypted, ephemeral, private memos on atproto
1import {
2 produceRequirements,
3 type XRPCProcedures,
4 type XRPCQueries,
5} from "@cistern/shared";
6import { decryptText, generateKeys } from "@cistern/crypto";
7import { generateRandomName } from "@puregarlic/randimal";
8import { is, parse, type RecordKey } from "@atcute/lexicons";
9import { JetstreamSubscription } from "@atcute/jetstream";
10import type { Did } from "@atcute/lexicons/syntax";
11import type { Client, CredentialManager } from "@atcute/client";
12import { AppCisternMemo, type AppCisternPubkey } from "@cistern/lexicon";
13import type {
14 ConsumerOptions,
15 ConsumerParams,
16 DecryptedMemo,
17 LocalKeyPair,
18} from "./types.ts";
19
20/**
21 * Creates a `Consumer` instance with all necessary requirements. This is the recommended way to construct a `Consumer`.
22 *
23 * @description Resolves the user's DID using Slingshot, instantiates an `@atcute/client` instance, creates an initial session, and then returns a new Consumer.
24 * @param {ConsumerOptions} options - Information for constructing the underlying XRPC client
25 * @returns {Promise<Consumer>} A Cistern consumer client with an authorized session
26 */
27export async function createConsumer(
28 options: ConsumerOptions,
29): Promise<Consumer> {
30 const reqs = await produceRequirements(options);
31
32 return new Consumer(reqs);
33}
34
35/**
36 * Client for generating keys and decoding Cistern memos.
37 */
38export class Consumer {
39 /** DID of the user this consumer acts on behalf of */
40 did: Did;
41
42 /** `@atcute/client` instance with credential manager */
43 rpc: Client<XRPCQueries, XRPCProcedures>;
44
45 /** Private key used for decrypting and the AT URI of its associated public key */
46 keypair?: LocalKeyPair;
47
48 constructor(params: ConsumerParams) {
49 this.did = params.miniDoc.did;
50 this.keypair = params.options.keypair
51 ? {
52 privateKey: Uint8Array.fromBase64(params.options.keypair.privateKey),
53 publicKey: params.options.keypair.publicKey,
54 }
55 : undefined;
56 this.rpc = params.rpc;
57 }
58
59 /**
60 * Generates a key pair, uploading the public key to PDS and returning the pair.
61 */
62 async generateKeyPair(): Promise<LocalKeyPair> {
63 if (this.keypair) {
64 throw new Error("client already has a key pair");
65 }
66
67 const keys = generateKeys();
68 const name = await generateRandomName();
69
70 const record: AppCisternPubkey.Main = {
71 $type: "app.cistern.pubkey",
72 name,
73 algorithm: "x_wing",
74 content: { $bytes: keys.publicKey.toBase64() },
75 createdAt: new Date().toISOString(),
76 };
77 const res = await this.rpc.post("com.atproto.repo.createRecord", {
78 input: {
79 collection: "app.cistern.pubkey",
80 repo: this.did,
81 record,
82 },
83 });
84
85 if (!res.ok) {
86 throw new Error(
87 `failed to save public key: ${res.status} ${res.data.error}`,
88 );
89 }
90
91 const keypair = {
92 privateKey: keys.secretKey,
93 publicKey: res.data.uri,
94 };
95
96 this.keypair = keypair;
97
98 return keypair;
99 }
100
101 /**
102 * Asynchronously iterate through memos in the user's PDS
103 */
104 async *listMemos(): AsyncGenerator<
105 DecryptedMemo,
106 void,
107 undefined
108 > {
109 if (!this.keypair) {
110 throw new Error("no key pair set; generate a key before listing memos");
111 }
112
113 let cursor: string | undefined;
114
115 while (true) {
116 const res = await this.rpc.get("com.atproto.repo.listRecords", {
117 params: {
118 collection: "app.cistern.memo",
119 repo: this.did,
120 cursor,
121 },
122 });
123
124 if (!res.ok) {
125 throw new Error(
126 `failed to list memos: ${res.status} ${res.data.error}`,
127 );
128 }
129
130 cursor = res.data.cursor;
131
132 for (const record of res.data.records) {
133 const memo = parse(AppCisternMemo.mainSchema, record.value);
134
135 if (memo.pubkey !== this.keypair.publicKey) continue;
136
137 const decrypted = decryptText(this.keypair.privateKey, {
138 nonce: memo.nonce.$bytes,
139 cipherText: memo.ciphertext.$bytes,
140 content: memo.payload.$bytes,
141 hash: memo.contentHash.$bytes,
142 length: memo.contentLength,
143 });
144
145 yield {
146 tid: memo.tid,
147 text: decrypted,
148 };
149 }
150
151 if (!cursor) return;
152 }
153 }
154
155 /**
156 * Subscribes to the Jetstreams for the user's memos. Pass `"stop"` into `subscription.next(...)` to cancel
157 * @todo Allow specifying Jetstream endpoint
158 */
159 async *subscribeToMemos(): AsyncGenerator<
160 DecryptedMemo,
161 void,
162 "stop" | undefined
163 > {
164 if (!this.keypair) {
165 throw new Error("no key pair set; generate a key before subscribing");
166 }
167
168 const subscription = new JetstreamSubscription({
169 url: "wss://jetstream2.us-east.bsky.network",
170 wantedCollections: ["app.cistern.memo"],
171 wantedDids: [this.did],
172 });
173
174 for await (const event of subscription) {
175 if (event.kind === "commit" && event.commit.operation === "create") {
176 const record = event.commit.record;
177
178 if (!is(AppCisternMemo.mainSchema, record)) {
179 continue;
180 }
181
182 if (record.pubkey !== this.keypair.publicKey) {
183 continue;
184 }
185
186 const decrypted = decryptText(this.keypair.privateKey, {
187 nonce: record.nonce.$bytes,
188 cipherText: record.ciphertext.$bytes,
189 content: record.payload.$bytes,
190 hash: record.contentHash.$bytes,
191 length: record.contentLength,
192 });
193
194 const command = yield { tid: record.tid, text: decrypted };
195
196 if (command === "stop") return;
197 }
198 }
199 }
200
201 /**
202 * Deletes a memo from the user's PDS by record key.
203 */
204 async deleteMemo(key: RecordKey) {
205 const res = await this.rpc.post("com.atproto.repo.deleteRecord", {
206 input: {
207 collection: "app.cistern.memo",
208 repo: this.did,
209 rkey: key,
210 },
211 });
212
213 if (!res.ok) {
214 throw new Error(
215 `failed to delete memo ${key}: ${res.status} ${res.data.error}`,
216 );
217 }
218 }
219}