a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Keep members up-to-date

+90 -51
+28 -7
app.js
··· 14 14 15 15 configure(); 16 16 17 + /** @param {string} did */ 18 + function colorForDid(did) { 19 + let hash = 0; 20 + for (let i = 0; i < did.length; i++) { 21 + hash = (hash * 31 + did.charCodeAt(i)) >>> 0; 22 + } 23 + return `hsl(${hash % 360}, 70%, 45%)`; 24 + } 25 + 17 26 export class App extends Component { 18 27 loading = signal(false); 19 28 did = signal(""); ··· 115 124 116 125 this.view = new EditorView(this.editorRef.current, { state }); 117 126 await this.provider.load(); 127 + 128 + const color = colorForDid(this.props.did); 129 + let name = this.props.did; 130 + try { 131 + const res = await fetch( 132 + `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(this.props.did)}`, 133 + ); 134 + if (res.ok) { 135 + const profile = await res.json(); 136 + name = profile.displayName || profile.handle || this.props.did; 137 + } 138 + } catch {} 139 + this.provider.awareness.setLocalState({ user: { name, color } }); 118 140 } 119 141 120 142 componentWillUnmount() { ··· 140 162 141 163 async open(provider) { 142 164 this.provider = provider; 143 - const doc = await provider.getDocument(); 144 - this.members.value = doc.members; 165 + this.members.value = provider.getMembers(); 145 166 this.dialogRef.current.showModal(); 146 167 } 147 168 ··· 150 171 const identifier = e.currentTarget.did.value.trim(); 151 172 if (!identifier) return; 152 173 const newDid = await resolve(identifier); 153 - const doc = await this.provider.getDocument(); 154 - if (doc.members.includes(newDid)) return; 155 - const updated = [...doc.members, newDid]; 156 - await this.provider.updateMembers(updated); 174 + const members = this.provider.getMembers(); 175 + if (members.includes(newDid)) return; 176 + const updated = [...members, newDid]; 177 + await this.provider.setMembers(updated); 157 178 this.members.value = updated; 158 179 e.currentTarget.reset(); 159 180 } 160 181 161 182 async removeMember(memberDid) { 162 183 const updated = this.members.value.filter(m => m !== memberDid); 163 - await this.provider.updateMembers(updated); 184 + await this.provider.setMembers(updated); 164 185 this.members.value = updated; 165 186 } 166 187
+62 -44
y-pds.js
··· 7 7 const url = did.startsWith("did:web:") 8 8 ? `https://${did.slice("did:web:".length)}/.well-known/did.json` 9 9 : `https://plc.directory/${did}`; 10 - const doc = await fetch(url).then((r) => r.json()); 11 - const pds = doc.service?.find((s) => s.type === "AtprotoPersonalDataServer")?.serviceEndpoint; 10 + const doc = await fetch(url).then(r => r.json()); 11 + const pds = doc.service?.find(s => s.type === "AtprotoPersonalDataServer")?.serviceEndpoint; 12 12 if (!pds) throw new Error(`no PDS found for ${did}`); 13 13 return new Client({ handler: simpleFetchHandler({ service: pds }) }); 14 14 } ··· 18 18 const CURSOR_COLLECTION = "com.jakelazaroff.ypds.awareness"; 19 19 20 20 const encode = (/** @type {Uint8Array} */ update) => btoa(String.fromCharCode(...update)); 21 - const decode = (/** @type {string} */ b64) => Uint8Array.from(atob(b64), (c) => c.charCodeAt(0)); 22 - 23 - /** @param {string} did */ 24 - function colorForDid(did) { 25 - let hash = 0; 26 - for (let i = 0; i < did.length; i++) { 27 - hash = (hash * 31 + did.charCodeAt(i)) >>> 0; 28 - } 29 - return `hsl(${hash % 360}, 70%, 45%)`; 30 - } 21 + const decode = (/** @type {string} */ b64) => Uint8Array.from(atob(b64), c => c.charCodeAt(0)); 31 22 32 23 const JETSTREAM = "wss://jetstream2.us-east.bsky.network/subscribe"; 33 24 ··· 44 35 /** @type {ReturnType<typeof setTimeout> | null} */ 45 36 #cursorTimer = null; 46 37 38 + #jetstream = JETSTREAM; 39 + #cursorDebounce = 500; 40 + 41 + /** @type {Set<string>} */ 42 + #repos = new Set(); 43 + 47 44 /** 48 45 * @param {Y.Doc} ydoc 49 - * @param {string} atUri at://ownerDid/collection/rkey 50 - * @param {{ rpc: Client, did: string }} options 46 + * @param {string} atUri at://ownerDid/collection/rkey 47 + * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string, cursorDebounce?: number }} options 51 48 */ 52 - constructor(ydoc, atUri, { rpc, did }) { 49 + constructor(ydoc, atUri, { rpc, did, awareness, jetstream = JETSTREAM, cursorDebounce = 500 }) { 53 50 this.#ydoc = ydoc; 54 51 const [ownerDid, , rkey] = atUri.slice("at://".length).split("/"); 55 52 this.#ownerDid = ownerDid; 56 53 this.#rkey = rkey; 57 54 this.rpc = rpc; 58 55 this.did = did; 59 - this.awareness = new Awareness(ydoc); 56 + this.awareness = awareness ?? new Awareness(ydoc); 57 + this.#jetstream = jetstream; 58 + this.#cursorDebounce = cursorDebounce; 60 59 } 61 60 62 61 async load() { 63 62 const doc = await this.#ensureDocument(); 64 - const repos = [this.#ownerDid, ...doc.members]; 63 + this.#repos = new Set([this.#ownerDid, ...doc.members]); 65 64 66 - const perRepo = await Promise.all(repos.map((repo) => this.#fetchUpdates(repo))); 65 + const perRepo = await Promise.all([...this.#repos].map(repo => this.#fetchUpdates(repo))); 67 66 const updates = perRepo 68 67 .flat() 69 68 .sort((a, b) => a.value.createdAt.localeCompare(b.value.createdAt)); ··· 74 73 } 75 74 }); 76 75 77 - // Fetch display name and set local awareness state 78 - const color = colorForDid(this.did); 79 - let name = this.did; 80 - try { 81 - const res = await fetch( 82 - `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(this.did)}`, 83 - ); 84 - if (res.ok) { 85 - const profile = await res.json(); 86 - name = profile.displayName || profile.handle || this.did; 87 - } 88 - } catch {} 89 - this.awareness.setLocalState({ user: { name, color } }); 90 - 91 76 this.awareness.on("change", this.#onAwarenessChange); 92 77 this.#ydoc.on("update", this.#onUpdate); 93 - this.#subscribe(new Set(repos)); 78 + this.#subscribe(); 94 79 } 95 80 96 81 async #fetchUpdates(repo) { ··· 106 91 cursor = res.data.cursor; 107 92 } while (cursor); 108 93 109 - return records.filter((r) => r.value.docId === this.#rkey); 94 + return records.filter(r => r.value.docId === this.#rkey); 110 95 } 111 96 112 97 /** @param {Uint8Array} update @param {unknown} origin */ ··· 132 117 const ours = this.awareness.clientID; 133 118 if (!added.includes(ours) && !updated.includes(ours)) return; 134 119 if (this.#cursorTimer) clearTimeout(this.#cursorTimer); 135 - this.#cursorTimer = setTimeout(() => this.#writeCursor(), 500); 120 + this.#cursorTimer = setTimeout(() => this.#writeCursor(), this.#cursorDebounce); 136 121 }; 137 122 138 123 async #writeCursor() { ··· 175 160 return record; 176 161 } 177 162 178 - async getDocument() { 163 + async #fetchDocument() { 179 164 const rpc = this.#ownerDid === this.did ? this.rpc : await clientForDid(this.#ownerDid); 180 165 const res = await rpc.get("com.atproto.repo.getRecord", { 181 166 params: { repo: this.#ownerDid, collection: DOC_COLLECTION, rkey: this.#rkey }, ··· 184 169 return res.data.value; 185 170 } 186 171 187 - async updateMembers(members) { 188 - const doc = await this.getDocument(); 172 + getMembers() { 173 + return [...this.#repos].filter(r => r !== this.#ownerDid); 174 + } 175 + 176 + async setMembers(members) { 177 + const doc = await this.#fetchDocument(); 189 178 await this.rpc.post("com.atproto.repo.putRecord", { 190 179 input: { 191 180 repo: this.did, ··· 196 185 }); 197 186 } 198 187 199 - /** @param {Set<string>} repos */ 200 - #subscribe(repos) { 201 - const url = new URL(JETSTREAM); 188 + #subscribe() { 189 + const url = new URL(this.#jetstream); 202 190 url.searchParams.append("wantedCollections", COLLECTION); 203 191 url.searchParams.append("wantedCollections", CURSOR_COLLECTION); 204 - for (const repo of repos) url.searchParams.append("wantedDids", repo); 192 + url.searchParams.append("wantedCollections", DOC_COLLECTION); 193 + for (const repo of this.#repos) url.searchParams.append("wantedDids", repo); 205 194 206 195 this.#ws = new WebSocket(url); 207 - this.#ws.onmessage = async (e) => { 196 + this.#ws.onmessage = async e => { 208 197 const event = JSON.parse(e.data); 209 198 if (event.kind !== "commit") return; 210 - if (!repos.has(event.did)) return; 199 + 200 + if (event.commit.collection === DOC_COLLECTION) { 201 + if (event.did !== this.#ownerDid || event.commit.rkey !== this.#rkey) return; 202 + await this.#onMembersChange(); 203 + return; 204 + } 205 + 206 + if (!this.#repos.has(event.did)) return; 211 207 212 208 if (event.commit.collection === COLLECTION) { 213 209 if (event.commit.operation !== "create") return; ··· 227 223 applyAwarenessUpdate(this.awareness, decode(event.commit.record.awareness), "remote"); 228 224 } 229 225 }; 226 + } 227 + 228 + async #onMembersChange() { 229 + const doc = await this.#fetchDocument(); 230 + const newRepos = new Set([this.#ownerDid, ...doc.members]); 231 + 232 + const added = [...newRepos].filter(r => !this.#repos.has(r)); 233 + if (added.length > 0) { 234 + const perRepo = await Promise.all(added.map(repo => this.#fetchUpdates(repo))); 235 + const updates = perRepo 236 + .flat() 237 + .sort((a, b) => a.value.createdAt.localeCompare(b.value.createdAt)); 238 + Y.transact(this.#ydoc, () => { 239 + for (const { value } of updates) { 240 + Y.applyUpdate(this.#ydoc, decode(value.update)); 241 + } 242 + }); 243 + } 244 + 245 + this.#repos = newRepos; 246 + this.#ws?.close(); 247 + this.#subscribe(); 230 248 } 231 249 232 250 destroy() {