Encrypted, ephemeral, private memos on atproto
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 4e4d4e463e6e2bfddd2041bb70b1825dfc7b2c9a 219 lines 6.0 kB view raw
1import { 2 produceRequirements, 3 type XRPCProcedures, 4 type XRPCQueries, 5} from "@cistern/shared"; 6import { decryptText, generateKeys } from "@cistern/crypto"; 7import { generateRandomName } from "@puregarlic/randimal"; 8import { is, parse, type RecordKey } from "@atcute/lexicons"; 9import { JetstreamSubscription } from "@atcute/jetstream"; 10import type { Did } from "@atcute/lexicons/syntax"; 11import type { Client, CredentialManager } from "@atcute/client"; 12import { AppCisternMemo, type AppCisternPubkey } from "@cistern/lexicon"; 13import type { 14 ConsumerOptions, 15 ConsumerParams, 16 DecryptedMemo, 17 LocalKeyPair, 18} from "./types.ts"; 19 20/** 21 * Creates a `Consumer` instance with all necessary requirements. This is the recommended way to construct a `Consumer`. 22 * 23 * @description Resolves the user's DID using Slingshot, instantiates an `@atcute/client` instance, creates an initial session, and then returns a new Consumer. 24 * @param {ConsumerOptions} options - Information for constructing the underlying XRPC client 25 * @returns {Promise<Consumer>} A Cistern consumer client with an authorized session 26 */ 27export async function createConsumer( 28 options: ConsumerOptions, 29): Promise<Consumer> { 30 const reqs = await produceRequirements(options); 31 32 return new Consumer(reqs); 33} 34 35/** 36 * Client for generating keys and decoding Cistern memos. 37 */ 38export class Consumer { 39 /** DID of the user this consumer acts on behalf of */ 40 did: Did; 41 42 /** `@atcute/client` instance with credential manager */ 43 rpc: Client<XRPCQueries, XRPCProcedures>; 44 45 /** Private key used for decrypting and the AT URI of its associated public key */ 46 keypair?: LocalKeyPair; 47 48 constructor(params: ConsumerParams) { 49 this.did = params.miniDoc.did; 50 this.keypair = params.options.keypair 51 ? { 52 privateKey: Uint8Array.fromBase64(params.options.keypair.privateKey), 53 publicKey: params.options.keypair.publicKey, 54 } 55 : undefined; 56 this.rpc = params.rpc; 57 } 58 59 /** 60 * Generates a key pair, uploading the public key to PDS and returning the pair. 61 */ 62 async generateKeyPair(): Promise<LocalKeyPair> { 63 if (this.keypair) { 64 throw new Error("client already has a key pair"); 65 } 66 67 const keys = generateKeys(); 68 const name = await generateRandomName(); 69 70 const record: AppCisternPubkey.Main = { 71 $type: "app.cistern.pubkey", 72 name, 73 algorithm: "x_wing", 74 content: { $bytes: keys.publicKey.toBase64() }, 75 createdAt: new Date().toISOString(), 76 }; 77 const res = await this.rpc.post("com.atproto.repo.createRecord", { 78 input: { 79 collection: "app.cistern.pubkey", 80 repo: this.did, 81 record, 82 }, 83 }); 84 85 if (!res.ok) { 86 throw new Error( 87 `failed to save public key: ${res.status} ${res.data.error}`, 88 ); 89 } 90 91 const keypair = { 92 privateKey: keys.secretKey, 93 publicKey: res.data.uri, 94 }; 95 96 this.keypair = keypair; 97 98 return keypair; 99 } 100 101 /** 102 * Asynchronously iterate through memos in the user's PDS 103 */ 104 async *listMemos(): AsyncGenerator< 105 DecryptedMemo, 106 void, 107 undefined 108 > { 109 if (!this.keypair) { 110 throw new Error("no key pair set; generate a key before listing memos"); 111 } 112 113 let cursor: string | undefined; 114 115 while (true) { 116 const res = await this.rpc.get("com.atproto.repo.listRecords", { 117 params: { 118 collection: "app.cistern.memo", 119 repo: this.did, 120 cursor, 121 }, 122 }); 123 124 if (!res.ok) { 125 throw new Error( 126 `failed to list memos: ${res.status} ${res.data.error}`, 127 ); 128 } 129 130 cursor = res.data.cursor; 131 132 for (const record of res.data.records) { 133 const memo = parse(AppCisternMemo.mainSchema, record.value); 134 135 if (memo.pubkey !== this.keypair.publicKey) continue; 136 137 const decrypted = decryptText(this.keypair.privateKey, { 138 nonce: memo.nonce.$bytes, 139 cipherText: memo.ciphertext.$bytes, 140 content: memo.payload.$bytes, 141 hash: memo.contentHash.$bytes, 142 length: memo.contentLength, 143 }); 144 145 yield { 146 tid: memo.tid, 147 text: decrypted, 148 }; 149 } 150 151 if (!cursor) return; 152 } 153 } 154 155 /** 156 * Subscribes to the Jetstreams for the user's memos. Pass `"stop"` into `subscription.next(...)` to cancel 157 * @todo Allow specifying Jetstream endpoint 158 */ 159 async *subscribeToMemos(): AsyncGenerator< 160 DecryptedMemo, 161 void, 162 "stop" | undefined 163 > { 164 if (!this.keypair) { 165 throw new Error("no key pair set; generate a key before subscribing"); 166 } 167 168 const subscription = new JetstreamSubscription({ 169 url: "wss://jetstream2.us-east.bsky.network", 170 wantedCollections: ["app.cistern.memo"], 171 wantedDids: [this.did], 172 }); 173 174 for await (const event of subscription) { 175 if (event.kind === "commit" && event.commit.operation === "create") { 176 const record = event.commit.record; 177 178 if (!is(AppCisternMemo.mainSchema, record)) { 179 continue; 180 } 181 182 if (record.pubkey !== this.keypair.publicKey) { 183 continue; 184 } 185 186 const decrypted = decryptText(this.keypair.privateKey, { 187 nonce: record.nonce.$bytes, 188 cipherText: record.ciphertext.$bytes, 189 content: record.payload.$bytes, 190 hash: record.contentHash.$bytes, 191 length: record.contentLength, 192 }); 193 194 const command = yield { tid: record.tid, text: decrypted }; 195 196 if (command === "stop") return; 197 } 198 } 199 } 200 201 /** 202 * Deletes a memo from the user's PDS by record key. 203 */ 204 async deleteMemo(key: RecordKey) { 205 const res = await this.rpc.post("com.atproto.repo.deleteRecord", { 206 input: { 207 collection: "app.cistern.memo", 208 repo: this.did, 209 rkey: key, 210 }, 211 }); 212 213 if (!res.ok) { 214 throw new Error( 215 `failed to delete memo ${key}: ${res.status} ${res.data.error}`, 216 ); 217 } 218 } 219}