a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Simplify y-pds.js even more

+26 -59
+26 -59
y-pds.js
··· 32 32 33 33 client = new Client({ handler: simpleFetchHandler({ service: pds }) }); 34 34 35 - this.#clients.set(did); 35 + this.#clients.set(did, client); 36 36 return client; 37 37 } 38 38 39 - #ydoc = new Y.Doc(); 39 + /** @type {Y.Doc} */ 40 + #ydoc; 40 41 41 42 #ownerDid = ""; 42 43 #rkey = ""; ··· 46 47 47 48 #jetstream = JETSTREAM; 48 49 49 - /** @type {Set<string>} */ 50 - #repos = new Set(); 50 + /** @type {{ editors: string[], [key: string]: unknown } | null} */ 51 + #doc = null; 51 52 52 53 #cursor = 0; 53 54 #destroyed = false; ··· 57 58 * @param {string} atUri at://ownerDid/collection/rkey 58 59 * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string }} options 59 60 */ 60 - constructor(ydoc, atUri, { rpc, did, awareness, jetstream = JETSTREAM, cursorDebounce = 500 }) { 61 + constructor(ydoc, atUri, { rpc, did, awareness, jetstream = JETSTREAM }) { 61 62 this.#clients.set(did, rpc); 62 63 63 64 this.#ydoc = ydoc; ··· 69 70 this.#jetstream = jetstream; 70 71 71 72 this.awareness.on("change", this.#onAwarenessChange); 73 + this.#ydoc.on("update", this.#onUpdate); 72 74 } 73 75 74 76 /** ··· 91 93 * @params {object} options 92 94 * @params {string} options.repo 93 95 * @params {string} options.collection 94 - * @params {number} [options.limit] 95 96 * @params {string} [options.cursor] 96 97 */ 97 - async #listRecords({ repo, collection, limit, cursor }) { 98 + async #listRecords({ repo, collection, cursor }) { 98 99 const client = await this.#getClient(repo); 99 100 100 101 const res = await client.get("com.atproto.repo.listRecords", { ··· 126 127 } 127 128 128 129 async load() { 129 - const doc = await this.#ensureDocument(); 130 - this.#repos = new Set([this.#ownerDid, ...doc.editors]); 130 + this.#doc = await this.#ensureDocument(); 131 + 132 + const repos = [this.#ownerDid].concat(this.#doc.editors); 133 + const updates = await Promise.all(repos.map(repo => this.#fetchUpdates(repo))); 131 134 132 - const perRepo = await Promise.all([...this.#repos].map(repo => this.#fetchUpdates(repo))); 133 - const updates = perRepo 135 + const sorted = updates 134 136 .flat() 135 137 .sort((a, b) => a.value.createdAt.localeCompare(b.value.createdAt)); 136 - 137 138 Y.transact(this.#ydoc, () => { 138 - for (const { value } of updates) { 139 + for (const { value } of sorted) { 139 140 Y.applyUpdate(this.#ydoc, decode(value.update)); 140 141 } 141 142 }); 142 143 143 - this.#ydoc.on("update", this.#onUpdate); 144 + this.#ws?.close(); 144 145 this.#subscribe(); 145 146 } 146 147 ··· 149 150 let cursor; 150 151 151 152 do { 152 - const res = await this.#listRecords({ 153 - repo, 154 - collection: UPDATE_COLLECTION, 155 - limit: 100, 156 - cursor, 157 - }); 153 + const res = await this.#listRecords({ repo, collection: UPDATE_COLLECTION, cursor }); 158 154 records.push(...res.records.filter(r => r.value.docId === this.#rkey)); 159 155 cursor = res.cursor; 160 156 } while (cursor); ··· 211 207 return record; 212 208 } 213 209 214 - async #fetchDocument() { 215 - const data = await this.#getRecord({ 216 - repo: this.#ownerDid, 217 - collection: DOC_COLLECTION, 218 - rkey: this.#rkey, 219 - }); 220 - 221 - return data.value; 222 - } 223 - 224 210 getMembers() { 225 - return [...this.#repos].filter(r => r !== this.#ownerDid); 211 + return this.#doc.editors; 226 212 } 227 213 228 214 async setMembers(editors) { 229 - const doc = await this.#fetchDocument(); 230 215 await this.#upsertRecord({ 231 216 repo: this.did, 232 217 collection: DOC_COLLECTION, 233 218 rkey: this.#rkey, 234 - record: { ...doc, editors }, 219 + record: { ...this.#doc, editors }, 235 220 }); 221 + this.#doc = { ...this.#doc, editors }; 236 222 } 237 223 238 224 #subscribe() { ··· 240 226 url.searchParams.append("wantedCollections", UPDATE_COLLECTION); 241 227 url.searchParams.append("wantedCollections", AWARENESS_COLLECTION); 242 228 url.searchParams.append("wantedCollections", DOC_COLLECTION); 243 - for (const repo of this.#repos) url.searchParams.append("wantedDids", repo); 229 + url.searchParams.append("wantedDids", this.#ownerDid); 230 + for (const editor of this.#doc.editors) url.searchParams.append("wantedDids", editor); 244 231 245 232 if (this.#cursor) url.searchParams.set("cursor", this.#cursor); 246 233 247 234 this.#ws = new WebSocket(url); 248 235 this.#ws.onclose = e => { 249 236 if (e.target !== this.#ws || this.#destroyed) return; 250 - setTimeout(() => this.#subscribe(), 1000); 237 + setTimeout(() => { 238 + if (!this.#destroyed) this.#subscribe(); 239 + }, 1000); 251 240 }; 252 241 this.#ws.onmessage = async e => { 253 242 const event = JSON.parse(e.data); ··· 256 245 257 246 if (event.commit.collection === DOC_COLLECTION) { 258 247 if (event.did !== this.#ownerDid || event.commit.rkey !== this.#rkey) return; 259 - await this.#onMembersChange(); 248 + await this.load(); 260 249 return; 261 250 } 262 251 263 - if (!this.#repos.has(event.did)) return; 252 + if (event.did !== this.#ownerDid && !this.#doc.editors.includes(event.did)) return; 264 253 265 254 if (event.commit.collection === UPDATE_COLLECTION) { 266 255 if (event.commit.operation !== "create") return; ··· 285 274 applyAwarenessUpdate(this.awareness, decode(event.commit.record.awareness), "remote"); 286 275 } 287 276 }; 288 - } 289 - 290 - async #onMembersChange() { 291 - const doc = await this.#fetchDocument(); 292 - const newRepos = new Set([this.#ownerDid, ...doc.editors]); 293 - 294 - const added = [...newRepos].filter(r => !this.#repos.has(r)); 295 - if (added.length > 0) { 296 - const perRepo = await Promise.all(added.map(repo => this.#fetchUpdates(repo))); 297 - const updates = perRepo 298 - .flat() 299 - .sort((a, b) => a.value.createdAt.localeCompare(b.value.createdAt)); 300 - Y.transact(this.#ydoc, () => { 301 - for (const { value } of updates) { 302 - Y.applyUpdate(this.#ydoc, decode(value.update)); 303 - } 304 - }); 305 - } 306 - 307 - this.#repos = newRepos; 308 - this.#ws?.close(); 309 - this.#subscribe(); 310 277 } 311 278 312 279 destroy() {