a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Simplify y-pds.js more

+40 -19
+40 -19
y-pds.js
··· 58 58 * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string }} options 59 59 */ 60 60 constructor(ydoc, atUri, { rpc, did, awareness, jetstream = JETSTREAM, cursorDebounce = 500 }) { 61 + this.#clients.set(did, rpc); 62 + 61 63 this.#ydoc = ydoc; 62 64 const [ownerDid, , rkey] = atUri.slice("at://".length).split("/"); 63 65 this.#ownerDid = ownerDid; 64 66 this.#rkey = rkey; 65 - this.rpc = rpc; 66 67 this.did = did; 67 68 this.awareness = awareness ?? new Awareness(ydoc); 68 69 this.#jetstream = jetstream; ··· 72 73 73 74 /** 74 75 * @params {object} options 75 - * @params {Client} [options.client] 76 76 * @params {string} [options.rkey] 77 77 * @params {string} options.repo 78 78 * @params {string} options.collection 79 79 */ 80 - async #getRecord({ client = this.rpc, repo, collection, rkey }) { 80 + async #getRecord({ repo, collection, rkey }) { 81 + const client = await this.#getClient(repo); 82 + 81 83 const res = await client.get("com.atproto.repo.getRecord", { 82 84 params: { repo, collection, rkey }, 83 85 }); ··· 87 89 88 90 /** 89 91 * @params {object} options 90 - * @params {Client} [options.client] 91 - * @params {string} [options.rkey] 92 + * @params {string} options.repo 93 + * @params {string} options.collection 94 + * @params {number} [options.limit] 95 + * @params {string} [options.cursor] 96 + */ 97 + async #listRecords({ repo, collection, limit, cursor }) { 98 + const client = await this.#getClient(repo); 99 + 100 + const res = await client.get("com.atproto.repo.listRecords", { 101 + params: { repo, collection, limit: 100, cursor }, 102 + }); 103 + 104 + if (!res.ok) throw new Error(res.data.message); 105 + return res.data; 106 + } 107 + 108 + /** 109 + * @params {object} options 92 110 * @params {string} options.repo 93 111 * @params {string} options.collection 94 112 * @params {object} options.record 113 + * @params {string} [options.rkey] 95 114 */ 96 - async #upsertRecord({ client = this.rpc, repo, collection, rkey, record }) { 115 + async #upsertRecord({ repo, collection, rkey, record }) { 116 + const client = await this.#getClient(repo); 97 117 const verb = rkey ? "com.atproto.repo.putRecord" : "com.atproto.repo.createRecord"; 118 + 98 119 const res = await client.post(verb, { 99 120 params: {}, 100 121 input: { repo, collection, rkey, record: { $type: collection, ...record } }, ··· 124 145 } 125 146 126 147 async #fetchUpdates(repo) { 127 - const rpc = repo === this.did ? this.rpc : await this.#getClient(repo); 128 148 const records = []; 129 149 let cursor; 130 150 131 151 do { 132 - const res = await rpc.get("com.atproto.repo.listRecords", { 133 - params: { repo, collection: UPDATE_COLLECTION, limit: 100, cursor }, 152 + const res = await this.#listRecords({ 153 + repo, 154 + collection: UPDATE_COLLECTION, 155 + limit: 100, 156 + cursor, 134 157 }); 135 - records.push(...res.data.records); 136 - cursor = res.data.cursor; 158 + records.push(...res.records.filter(r => r.value.docId === this.#rkey)); 159 + cursor = res.cursor; 137 160 } while (cursor); 138 161 139 - return records.filter(r => r.value.docId === this.#rkey); 162 + return records; 140 163 } 141 164 142 - /** @param {Uint8Array} update @param {unknown} origin */ 165 + /** 166 + * @param {Uint8Array} update 167 + * @param {unknown} origin 168 + */ 143 169 #onUpdate = async (update, origin) => { 144 170 if (origin === this) return; 145 171 await this.#upsertRecord({ ··· 164 190 }; 165 191 166 192 async #ensureDocument() { 167 - const rpc = this.#ownerDid === this.did ? this.rpc : await this.#getClient(this.#ownerDid); 168 193 try { 169 194 const data = await this.#getRecord({ 170 - client: rpc, 171 195 repo: this.#ownerDid, 172 196 collection: DOC_COLLECTION, 173 197 rkey: this.#rkey, ··· 188 212 } 189 213 190 214 async #fetchDocument() { 191 - const rpc = this.#ownerDid === this.did ? this.rpc : await clientForDid(this.#ownerDid); 192 215 const data = await this.#getRecord({ 193 - client: rpc, 194 216 repo: this.#ownerDid, 195 217 collection: DOC_COLLECTION, 196 218 rkey: this.#rkey, 197 219 }); 220 + 198 221 return data.value; 199 222 } 200 223 ··· 244 267 if (event.commit.record.docId !== this.#rkey) return; 245 268 246 269 // verify: re-fetch the record and confirm the CID matches 247 - const rpc = event.did === this.did ? this.rpc : await this.#getClient(event.did); 248 270 try { 249 271 const data = await this.#getRecord({ 250 - client: rpc, 251 272 repo: event.did, 252 273 collection: UPDATE_COLLECTION, 253 274 rkey: event.commit.rkey,