a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add awareness

+121 -17
+4 -4
example/client-metadata.json
··· 1 1 { 2 - "client_id": "https://dc1b-66-108-106-210.ngrok-free.app/client-metadata.json", 3 - "client_uri": "https://dc1b-66-108-106-210.ngrok-free.app", 4 - "redirect_uris": ["https://dc1b-66-108-106-210.ngrok-free.app/callback.html"], 2 + "client_id": "https://25bf-66-108-106-210.ngrok-free.app/client-metadata.json", 3 + "client_uri": "https://25bf-66-108-106-210.ngrok-free.app", 4 + "redirect_uris": ["https://25bf-66-108-106-210.ngrok-free.app/callback.html"], 5 5 "application_type": "native", 6 6 "client_name": "atrtc demo", 7 7 "dpop_bound_access_tokens": true, 8 8 "grant_types": ["authorization_code", "refresh_token"], 9 9 "response_types": ["code"], 10 - "scope": "atproto repo?collection=com.jakelazaroff.ypds.doc&collection=com.jakelazaroff.ypds.update&collection=com.jakelazaroff.ypds.snapshot", 10 + "scope": "atproto repo?collection=com.jakelazaroff.ypds.doc&collection=com.jakelazaroff.ypds.update&collection=com.jakelazaroff.ypds.awareness&collection=com.jakelazaroff.ypds.snapshot", 11 11 "token_endpoint_auth_method": "none" 12 12 }
+41 -3
example/doc.html
··· 46 46 "prosemirror-example-setup": "https://esm.sh/prosemirror-example-setup", 47 47 "yjs": "https://esm.sh/yjs", 48 48 "y-prosemirror": "https://esm.sh/y-prosemirror", 49 + "y-protocols/awareness": "https://esm.sh/y-protocols/awareness", 49 50 "actor-typeahead": "https://esm.sh/actor-typeahead" 50 51 } 51 52 } ··· 60 61 import { EditorView } from "prosemirror-view"; 61 62 import { exampleSetup } from "prosemirror-example-setup"; 62 63 import * as Y from "yjs"; 63 - import { ySyncPlugin, yUndoPlugin } from "y-prosemirror"; 64 + import { ySyncPlugin, yUndoPlugin, yCursorPlugin } from "y-prosemirror"; 64 65 import { getSession } from "@atcute/oauth-browser-client"; 65 66 import { configure, client, resolve } from "./oauth.js"; 66 67 import { YPdsProvider, newDocUri } from "./y-pds.js"; ··· 81 82 82 83 const ydoc = new Y.Doc(); 83 84 const yxml = ydoc.getXmlFragment("prosemirror"); 85 + 86 + // provider must be created before the editor so we can pass its awareness to yCursorPlugin 87 + const provider = new YPdsProvider(ydoc, atUri, { rpc, did }); 84 88 85 89 const state = EditorState.create({ 86 90 schema, 87 - plugins: [...exampleSetup({ schema, history: false }), ySyncPlugin(yxml), yUndoPlugin()], 91 + plugins: [ 92 + ...exampleSetup({ schema, history: false }), 93 + ySyncPlugin(yxml), 94 + yUndoPlugin(), 95 + yCursorPlugin(provider.awareness, { 96 + cursorBuilder(user) { 97 + const el = document.createElement("span"); 98 + el.className = "collab-cursor"; 99 + el.style.setProperty("--color", user.color); 100 + el.dataset.name = user.name ?? ""; 101 + return el; 102 + }, 103 + }), 104 + ], 88 105 }); 89 106 90 107 const view = new EditorView(document.querySelector("#editor"), { state }); 91 108 92 - const provider = new YPdsProvider(ydoc, atUri, { rpc, did }); 93 109 await provider.load(); 94 110 95 111 const shareBtn = document.querySelector("#share"); ··· 209 225 border: 1px solid #ccc; 210 226 border-radius: 4px; 211 227 cursor: pointer; 228 + } 229 + 230 + .collab-cursor { 231 + position: relative; 232 + border-left: 2px solid var(--color); 233 + margin-left: -1px; 234 + pointer-events: none; 235 + } 236 + 237 + .collab-cursor::after { 238 + content: attr(data-name); 239 + position: absolute; 240 + top: -1.4em; 241 + left: -2px; 242 + background: var(--color); 243 + color: white; 244 + font-size: 0.7rem; 245 + font-family: sans-serif; 246 + padding: 0 4px; 247 + border-radius: 3px 3px 3px 0; 248 + white-space: nowrap; 249 + pointer-events: none; 212 250 } 213 251 </style> 214 252 </head>
+76 -10
example/y-pds.js
··· 1 1 /** @import { Client } from "@atcute/client"; */ 2 2 import { Client, simpleFetchHandler } from "@atcute/client"; 3 3 import * as Y from "yjs"; 4 + import { Awareness, encodeAwarenessUpdate, applyAwarenessUpdate } from "y-protocols/awareness"; 4 5 5 6 /** @param {string} did */ 6 7 async function clientForDid(did) { ··· 15 16 16 17 const COLLECTION = "com.jakelazaroff.ypds.update"; 17 18 const DOC_COLLECTION = "com.jakelazaroff.ypds.doc"; 19 + const CURSOR_COLLECTION = "com.jakelazaroff.ypds.awareness"; 18 20 19 21 const encode = (/** @type {Uint8Array} */ update) => btoa(String.fromCharCode(...update)); 20 22 const decode = (/** @type {string} */ b64) => Uint8Array.from(atob(b64), c => c.charCodeAt(0)); ··· 22 24 /** @param {string} did */ 23 25 export function newDocUri(did) { 24 26 return `at://${did}/${DOC_COLLECTION}/${crypto.randomUUID()}`; 27 + } 28 + 29 + /** @param {string} did */ 30 + function colorForDid(did) { 31 + let hash = 0; 32 + for (let i = 0; i < did.length; i++) { 33 + hash = (hash * 31 + did.charCodeAt(i)) >>> 0; 34 + } 35 + return `hsl(${hash % 360}, 70%, 45%)`; 25 36 } 26 37 27 38 const JETSTREAM = "wss://jetstream2.us-east.bsky.network/subscribe"; ··· 36 47 /** @type {WebSocket | null} */ 37 48 #ws = null; 38 49 50 + /** @type {ReturnType<typeof setTimeout> | null} */ 51 + #cursorTimer = null; 52 + 39 53 /** 40 54 * @param {Y.Doc} ydoc 41 55 * @param {string} atUri at://ownerDid/collection/rkey ··· 48 62 this.#rkey = rkey; 49 63 this.rpc = rpc; 50 64 this.did = did; 65 + this.awareness = new Awareness(ydoc); 51 66 } 52 67 53 68 async load() { ··· 65 80 } 66 81 }); 67 82 83 + // Fetch display name and set local awareness state 84 + const color = colorForDid(this.did); 85 + let name = this.did; 86 + try { 87 + const res = await fetch( 88 + `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(this.did)}`, 89 + ); 90 + if (res.ok) { 91 + const profile = await res.json(); 92 + name = profile.displayName || profile.handle || this.did; 93 + } 94 + } catch {} 95 + this.awareness.setLocalState({ user: { name, color } }); 96 + 97 + this.awareness.on("change", this.#onAwarenessChange); 68 98 this.#ydoc.on("update", this.#onUpdate); 69 99 this.#subscribe(new Set(repos)); 70 100 } ··· 103 133 }); 104 134 }; 105 135 136 + /** @param {{ added: number[], updated: number[] }} changes */ 137 + #onAwarenessChange = ({ added, updated }) => { 138 + const ours = this.awareness.clientID; 139 + if (!added.includes(ours) && !updated.includes(ours)) return; 140 + if (this.#cursorTimer) clearTimeout(this.#cursorTimer); 141 + this.#cursorTimer = setTimeout(() => this.#writeCursor(), 500); 142 + }; 143 + 144 + async #writeCursor() { 145 + const update = encodeAwarenessUpdate(this.awareness, [this.awareness.clientID]); 146 + await this.rpc.post("com.atproto.repo.putRecord", { 147 + params: {}, 148 + input: { 149 + repo: this.did, 150 + collection: CURSOR_COLLECTION, 151 + rkey: this.#rkey, 152 + record: { 153 + $type: CURSOR_COLLECTION, 154 + docId: this.#rkey, 155 + awareness: encode(update), 156 + createdAt: new Date().toISOString(), 157 + }, 158 + }, 159 + }); 160 + } 161 + 106 162 async #ensureDocument() { 107 163 const rpc = this.#ownerDid === this.did ? this.rpc : await clientForDid(this.#ownerDid); 108 164 const res = await rpc.get("com.atproto.repo.getRecord", { ··· 150 206 #subscribe(repos) { 151 207 const url = new URL(JETSTREAM); 152 208 url.searchParams.append("wantedCollections", COLLECTION); 209 + url.searchParams.append("wantedCollections", CURSOR_COLLECTION); 153 210 for (const repo of repos) url.searchParams.append("wantedDids", repo); 154 211 155 212 this.#ws = new WebSocket(url); 156 213 this.#ws.onmessage = async e => { 157 214 const event = JSON.parse(e.data); 158 215 if (event.kind !== "commit") return; 159 - if (event.commit.operation !== "create") return; 160 - if (event.commit.collection !== COLLECTION) return; 161 216 if (!repos.has(event.did)) return; 162 - if (event.commit.record.docId !== this.#rkey) return; 163 217 164 - // verify: re-fetch the record and confirm the CID matches 165 - const rpc = event.did === this.did ? this.rpc : await clientForDid(event.did); 166 - const res = await rpc.get("com.atproto.repo.getRecord", { 167 - params: { repo: event.did, collection: COLLECTION, rkey: event.commit.rkey }, 168 - }); 169 - if (!res.ok || res.data.cid !== event.commit.cid) return; 218 + if (event.commit.collection === COLLECTION) { 219 + if (event.commit.operation !== "create") return; 220 + if (event.commit.record.docId !== this.#rkey) return; 170 221 171 - Y.applyUpdate(this.#ydoc, decode(event.commit.record.update), this); 222 + // verify: re-fetch the record and confirm the CID matches 223 + const rpc = event.did === this.did ? this.rpc : await clientForDid(event.did); 224 + const res = await rpc.get("com.atproto.repo.getRecord", { 225 + params: { repo: event.did, collection: COLLECTION, rkey: event.commit.rkey }, 226 + }); 227 + if (!res.ok || res.data.cid !== event.commit.cid) return; 228 + 229 + Y.applyUpdate(this.#ydoc, decode(event.commit.record.update), this); 230 + } else if (event.commit.collection === CURSOR_COLLECTION) { 231 + if (event.did === this.did) return; 232 + if (event.commit.record.docId !== this.#rkey) return; 233 + applyAwarenessUpdate(this.awareness, decode(event.commit.record.awareness), "remote"); 234 + } 172 235 }; 173 236 } 174 237 175 238 destroy() { 176 239 this.#ydoc.off("update", this.#onUpdate); 240 + this.awareness.off("change", this.#onAwarenessChange); 241 + this.awareness.destroy(); 242 + if (this.#cursorTimer) clearTimeout(this.#cursorTimer); 177 243 this.#ws?.close(); 178 244 } 179 245 }