a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add throttling

+43 -8
+43 -8
y-pds.js
··· 118 118 /** @type {((editors: string[]) => void) | null} */ 119 119 onMembersChange = null; 120 120 121 - constructor({ ydoc, client, atUri, did, awareness, jetstream }) { 121 + #throttle = 5000; 122 + 123 + /** @type {Uint8Array[]} */ 124 + #pendingUpdates = []; 125 + #updateTimerId = null; 126 + 127 + #awarenessDirty = false; 128 + #awarenessTimerId = null; 129 + 130 + constructor({ ydoc, client, atUri, did, awareness, jetstream, throttle }) { 122 131 this.#clients.set(did, client); 123 132 this.#ydoc = ydoc; 124 133 ··· 128 137 this.did = did; 129 138 this.awareness = awareness ?? new Awareness(ydoc); 130 139 this.#jetstream = jetstream ?? this.#jetstream; 140 + if (throttle !== undefined) this.#throttle = throttle; 131 141 132 142 this.awareness.on("change", this.#onAwarenessChange); 133 143 this.#ydoc.on("update", this.#onUpdate); ··· 190 200 * @param {Uint8Array} update 191 201 * @param {unknown} origin 192 202 */ 193 - #onUpdate = async (update, origin) => { 194 - if (origin === this) return; 203 + #flushUpdate = async () => { 204 + this.#updateTimerId = null; 205 + if (this.#pendingUpdates.length === 0) return; 206 + const merged = this.#pendingUpdates.length === 1 207 + ? this.#pendingUpdates[0] 208 + : Y.mergeUpdates(this.#pendingUpdates); 209 + this.#pendingUpdates = []; 195 210 await this.#upsertRecord({ 196 211 repo: this.did, 197 212 collection: UPDATE_COLLECTION, 198 - record: { docId: this.#rkey, update: encode(update), createdAt: new Date().toISOString() }, 213 + record: { docId: this.#rkey, update: encode(merged), createdAt: new Date().toISOString() }, 199 214 }); 200 215 }; 201 216 217 + #onUpdate = (update, origin) => { 218 + if (origin === this) return; 219 + this.#pendingUpdates.push(update); 220 + if (this.#updateTimerId == null) { 221 + this.#updateTimerId = setTimeout(this.#flushUpdate, this.#throttle); 222 + } 223 + }; 224 + 202 225 /** @param {{ added: number[], updated: number[] }} changes */ 203 - #onAwarenessChange = async ({ added, updated }) => { 204 - const ours = this.awareness.clientID; 205 - if (!added.includes(ours) && !updated.includes(ours)) return; 206 - 226 + #flushAwareness = async () => { 227 + this.#awarenessTimerId = null; 228 + if (!this.#awarenessDirty) return; 229 + this.#awarenessDirty = false; 207 230 const update = encodeAwarenessUpdate(this.awareness, [this.awareness.clientID]); 208 231 await this.#upsertRecord({ 209 232 repo: this.did, ··· 211 234 rkey: this.#rkey, 212 235 record: { docId: this.#rkey, awareness: encode(update), createdAt: new Date().toISOString() }, 213 236 }); 237 + }; 238 + 239 + #onAwarenessChange = ({ added, updated }) => { 240 + const ours = this.awareness.clientID; 241 + if (!added.includes(ours) && !updated.includes(ours)) return; 242 + this.#awarenessDirty = true; 243 + if (this.#awarenessTimerId == null) { 244 + this.#awarenessTimerId = setTimeout(this.#flushAwareness, this.#throttle); 245 + } 214 246 }; 215 247 216 248 getMembers() { ··· 295 327 this.#destroyed = true; 296 328 this.#ydoc.off("update", this.#onUpdate); 297 329 this.awareness.off("change", this.#onAwarenessChange); 330 + clearTimeout(this.#updateTimerId); 331 + clearTimeout(this.#awarenessTimerId); 332 + this.#flushUpdate(); 298 333 this.awareness.destroy(); 299 334 this.#ws?.close(); 300 335 }