a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Simplify y-pds.js

+151 -101
+33 -13
app.js
··· 7 7 import { schema } from "prosemirror-schema-basic"; 8 8 import { EditorState } from "prosemirror-state"; 9 9 import { EditorView } from "prosemirror-view"; 10 - import { exampleSetup } from "prosemirror-example-setup"; 10 + import { exampleSetup, buildMenuItems } from "prosemirror-example-setup"; 11 11 import { configure, client, resolve, scope } from "./oauth.js"; 12 12 import { DOC_COLLECTION, YPdsProvider } from "./y-pds.js"; 13 13 import "actor-typeahead"; 14 14 15 15 configure(); 16 + 17 + /** @param {string} did */ 18 + async function fetchProfile(did) { 19 + try { 20 + const res = await fetch( 21 + `https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(did)}`, 22 + ); 23 + if (res.ok) { 24 + const p = await res.json(); 25 + return p.displayName ? `${p.displayName} (@${p.handle})` : `@${p.handle}`; 26 + } 27 + } catch {} 28 + return did; 29 + } 16 30 17 31 /** @param {string} did */ 18 32 function colorForDid(did) { ··· 176 190 177 191 async open(provider) { 178 192 this.provider = provider; 179 - this.editors.value = provider.getMembers(); 193 + const dids = provider.getMembers(); 194 + this.editors.value = await Promise.all( 195 + dids.map(async did => ({ did, label: await fetchProfile(did) })), 196 + ); 180 197 this.dialogRef.current.showModal(); 181 198 } 182 199 ··· 185 202 const identifier = e.currentTarget.did.value.trim(); 186 203 if (!identifier) return; 187 204 const newDid = await resolve(identifier); 188 - const editors = this.provider.getMembers(); 189 - if (editors.includes(newDid)) return; 190 - const updated = [...editors, newDid]; 205 + const dids = this.provider.getMembers(); 206 + if (dids.includes(newDid)) return; 207 + const updated = [...dids, newDid]; 191 208 await this.provider.setMembers(updated); 192 - this.editors.value = updated; 209 + this.editors.value = [ 210 + ...this.editors.value, 211 + { did: newDid, label: await fetchProfile(newDid) }, 212 + ]; 193 213 e.currentTarget.reset(); 194 214 } 195 215 196 - async removeMember(memberDid) { 197 - const updated = this.editors.value.filter(m => m !== memberDid); 216 + async removeMember(did) { 217 + const updated = this.editors.value.filter(m => m.did !== did).map(m => m.did); 198 218 await this.provider.setMembers(updated); 199 - this.editors.value = updated; 219 + this.editors.value = this.editors.value.filter(m => m.did !== did); 200 220 } 201 221 202 222 render() { ··· 205 225 <h2>Share</h2> 206 226 <ul id="editors"> 207 227 ${this.editors.value.map( 208 - m => html` 209 - <li key=${m}> 210 - <span>${m}</span> 211 - <button type="button" onClick=${() => this.removeMember(m)}>Remove</button> 228 + ({ did, label }) => html` 229 + <li key=${did}> 230 + <span>${label}</span> 231 + <button type="button" onClick=${() => this.removeMember(did)}>Remove</button> 212 232 </li> 213 233 `, 214 234 )}
+3 -3
client-metadata.json
··· 1 1 { 2 - "client_id": "https://4da7-66-108-106-210.ngrok-free.app/client-metadata.json", 3 - "client_uri": "https://4da7-66-108-106-210.ngrok-free.app", 4 - "redirect_uris": ["https://4da7-66-108-106-210.ngrok-free.app/callback.html"], 2 + "client_id": "https://ff10-66-108-106-210.ngrok-free.app/client-metadata.json", 3 + "client_uri": "https://ff10-66-108-106-210.ngrok-free.app", 4 + "redirect_uris": ["https://ff10-66-108-106-210.ngrok-free.app/callback.html"], 5 5 "application_type": "native", 6 6 "client_name": "atrtc demo", 7 7 "dpop_bound_access_tokens": true,
+115 -85
y-pds.js
··· 2 2 import * as Y from "yjs"; 3 3 import { Awareness, encodeAwarenessUpdate, applyAwarenessUpdate } from "y-protocols/awareness"; 4 4 5 - /** @param {string} did */ 6 - async function clientForDid(did) { 7 - const url = did.startsWith("did:web:") 8 - ? `https://${did.slice("did:web:".length)}/.well-known/did.json` 9 - : `https://plc.directory/${did}`; 10 - const doc = await fetch(url).then(r => r.json()); 11 - const pds = doc.service?.find(s => s.type === "AtprotoPersonalDataServer")?.serviceEndpoint; 12 - if (!pds) throw new Error(`no PDS found for ${did}`); 13 - return new Client({ handler: simpleFetchHandler({ service: pds }) }); 14 - } 15 - 16 5 export const DOC_COLLECTION = "com.jakelazaroff.ypds.doc"; 17 6 export const UPDATE_COLLECTION = "com.jakelazaroff.ypds.update"; 18 7 export const AWARENESS_COLLECTION = "com.jakelazaroff.ypds.awareness"; ··· 23 12 const JETSTREAM = "wss://jetstream2.us-east.bsky.network/subscribe"; 24 13 25 14 export class YPdsProvider { 26 - /** @type {Y.Doc} */ 27 - #ydoc; 15 + /** @type {Map<string, Client>} */ 16 + #clients = new Map(); 17 + 18 + /** @param {string} did */ 19 + async #getClient(did) { 20 + let client = this.#clients.get(did); 21 + if (client) return client; 22 + 23 + const url = did.startsWith("did:web:") 24 + ? `https://${did.slice("did:web:".length)}/.well-known/did.json` 25 + : `https://plc.directory/${did}`; 26 + 27 + const res = await fetch(url), 28 + doc = await res.json(); 29 + 30 + const pds = doc.service?.find(s => s.type === "AtprotoPersonalDataServer")?.serviceEndpoint; 31 + if (!pds) throw new Error(`no PDS found for ${did}`); 32 + 33 + client = new Client({ handler: simpleFetchHandler({ service: pds }) }); 34 + 35 + this.#clients.set(did); 36 + return client; 37 + } 38 + 39 + #ydoc = new Y.Doc(); 28 40 29 41 #ownerDid = ""; 30 42 #rkey = ""; ··· 32 44 /** @type {WebSocket | null} */ 33 45 #ws = null; 34 46 35 - /** @type {ReturnType<typeof setTimeout> | null} */ 36 - #cursorTimer = null; 37 - 38 47 #jetstream = JETSTREAM; 39 - #cursorDebounce = 500; 40 48 41 49 /** @type {Set<string>} */ 42 50 #repos = new Set(); ··· 47 55 /** 48 56 * @param {Y.Doc} ydoc 49 57 * @param {string} atUri at://ownerDid/collection/rkey 50 - * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string, cursorDebounce?: number }} options 58 + * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string }} options 51 59 */ 52 60 constructor(ydoc, atUri, { rpc, did, awareness, jetstream = JETSTREAM, cursorDebounce = 500 }) { 53 61 this.#ydoc = ydoc; ··· 58 66 this.did = did; 59 67 this.awareness = awareness ?? new Awareness(ydoc); 60 68 this.#jetstream = jetstream; 61 - this.#cursorDebounce = cursorDebounce; 69 + 70 + this.awareness.on("change", this.#onAwarenessChange); 71 + } 72 + 73 + /** 74 + * @params {object} options 75 + * @params {Client} [options.client] 76 + * @params {string} [options.rkey] 77 + * @params {string} options.repo 78 + * @params {string} options.collection 79 + */ 80 + async #getRecord({ client = this.rpc, repo, collection, rkey }) { 81 + const res = await client.get("com.atproto.repo.getRecord", { 82 + params: { repo, collection, rkey }, 83 + }); 84 + if (!res.ok) throw new Error(res.data.message); 85 + return res.data; 86 + } 87 + 88 + /** 89 + * @params {object} options 90 + * @params {Client} [options.client] 91 + * @params {string} [options.rkey] 92 + * @params {string} options.repo 93 + * @params {string} options.collection 94 + * @params {object} options.record 95 + */ 96 + async #upsertRecord({ client = this.rpc, repo, collection, rkey, record }) { 97 + const verb = rkey ? "com.atproto.repo.putRecord" : "com.atproto.repo.createRecord"; 98 + const res = await client.post(verb, { 99 + params: {}, 100 + input: { repo, collection, rkey, record: { $type: collection, ...record } }, 101 + }); 102 + 103 + if (!res.ok) throw new Error(res.data.message); 104 + return res.data; 62 105 } 63 106 64 107 async load() { ··· 76 119 } 77 120 }); 78 121 79 - this.awareness.on("change", this.#onAwarenessChange); 80 122 this.#ydoc.on("update", this.#onUpdate); 81 123 this.#subscribe(); 82 124 } 83 125 84 126 async #fetchUpdates(repo) { 85 - const rpc = repo === this.did ? this.rpc : await clientForDid(repo); 127 + const rpc = repo === this.did ? this.rpc : await this.#getClient(repo); 86 128 const records = []; 87 129 let cursor; 88 130 ··· 100 142 /** @param {Uint8Array} update @param {unknown} origin */ 101 143 #onUpdate = async (update, origin) => { 102 144 if (origin === this) return; 103 - await this.rpc.post("com.atproto.repo.createRecord", { 104 - params: {}, 105 - input: { 106 - repo: this.did, 107 - collection: UPDATE_COLLECTION, 108 - record: { 109 - $type: UPDATE_COLLECTION, 110 - docId: this.#rkey, 111 - update: encode(update), 112 - createdAt: new Date().toISOString(), 113 - }, 114 - }, 145 + await this.#upsertRecord({ 146 + repo: this.did, 147 + collection: UPDATE_COLLECTION, 148 + record: { docId: this.#rkey, update: encode(update), createdAt: new Date().toISOString() }, 115 149 }); 116 150 }; 117 151 118 152 /** @param {{ added: number[], updated: number[] }} changes */ 119 - #onAwarenessChange = ({ added, updated }) => { 153 + #onAwarenessChange = async ({ added, updated }) => { 120 154 const ours = this.awareness.clientID; 121 155 if (!added.includes(ours) && !updated.includes(ours)) return; 122 - if (this.#cursorTimer) clearTimeout(this.#cursorTimer); 123 - this.#cursorTimer = setTimeout(() => this.#writeCursor(), this.#cursorDebounce); 124 - }; 125 156 126 - async #writeCursor() { 127 157 const update = encodeAwarenessUpdate(this.awareness, [this.awareness.clientID]); 128 - await this.rpc.post("com.atproto.repo.putRecord", { 129 - params: {}, 130 - input: { 131 - repo: this.did, 132 - collection: AWARENESS_COLLECTION, 133 - rkey: this.#rkey, 134 - record: { 135 - $type: AWARENESS_COLLECTION, 136 - docId: this.#rkey, 137 - awareness: encode(update), 138 - createdAt: new Date().toISOString(), 139 - }, 140 - }, 158 + await this.#upsertRecord({ 159 + repo: this.did, 160 + collection: AWARENESS_COLLECTION, 161 + rkey: this.#rkey, 162 + record: { docId: this.#rkey, awareness: encode(update), createdAt: new Date().toISOString() }, 141 163 }); 142 - } 164 + }; 143 165 144 166 async #ensureDocument() { 145 - const rpc = this.#ownerDid === this.did ? this.rpc : await clientForDid(this.#ownerDid); 146 - const res = await rpc.get("com.atproto.repo.getRecord", { 147 - params: { repo: this.#ownerDid, collection: DOC_COLLECTION, rkey: this.#rkey }, 148 - }); 167 + const rpc = this.#ownerDid === this.did ? this.rpc : await this.#getClient(this.#ownerDid); 168 + try { 169 + const data = await this.#getRecord({ 170 + client: rpc, 171 + repo: this.#ownerDid, 172 + collection: DOC_COLLECTION, 173 + rkey: this.#rkey, 174 + }); 175 + return data.value; 176 + } catch { 177 + if (this.did !== this.#ownerDid) throw new Error("document not found"); 178 + } 149 179 150 - if (res.ok) return res.data.value; 151 - if (this.did !== this.#ownerDid) throw new Error("document not found"); 152 - 153 - const record = { 154 - $type: DOC_COLLECTION, 155 - docId: this.#rkey, 156 - editors: [], 157 - createdAt: new Date().toISOString(), 158 - }; 159 - await this.rpc.post("com.atproto.repo.putRecord", { 160 - params: {}, 161 - input: { repo: this.did, collection: DOC_COLLECTION, rkey: this.#rkey, record }, 180 + const record = { docId: this.#rkey, editors: [], createdAt: new Date().toISOString() }; 181 + await this.#upsertRecord({ 182 + repo: this.did, 183 + collection: DOC_COLLECTION, 184 + rkey: this.#rkey, 185 + record, 162 186 }); 163 187 return record; 164 188 } 165 189 166 190 async #fetchDocument() { 167 191 const rpc = this.#ownerDid === this.did ? this.rpc : await clientForDid(this.#ownerDid); 168 - const res = await rpc.get("com.atproto.repo.getRecord", { 169 - params: { repo: this.#ownerDid, collection: DOC_COLLECTION, rkey: this.#rkey }, 192 + const data = await this.#getRecord({ 193 + client: rpc, 194 + repo: this.#ownerDid, 195 + collection: DOC_COLLECTION, 196 + rkey: this.#rkey, 170 197 }); 171 - if (!res.ok) throw new Error("document not found"); 172 - return res.data.value; 198 + return data.value; 173 199 } 174 200 175 201 getMembers() { ··· 178 204 179 205 async setMembers(editors) { 180 206 const doc = await this.#fetchDocument(); 181 - await this.rpc.post("com.atproto.repo.putRecord", { 182 - input: { 183 - repo: this.did, 184 - collection: DOC_COLLECTION, 185 - rkey: this.#rkey, 186 - record: { ...doc, editors }, 187 - }, 207 + await this.#upsertRecord({ 208 + repo: this.did, 209 + collection: DOC_COLLECTION, 210 + rkey: this.#rkey, 211 + record: { ...doc, editors }, 188 212 }); 189 213 } 190 214 ··· 220 244 if (event.commit.record.docId !== this.#rkey) return; 221 245 222 246 // verify: re-fetch the record and confirm the CID matches 223 - const rpc = event.did === this.did ? this.rpc : await clientForDid(event.did); 224 - const res = await rpc.get("com.atproto.repo.getRecord", { 225 - params: { repo: event.did, collection: UPDATE_COLLECTION, rkey: event.commit.rkey }, 226 - }); 227 - if (!res.ok || res.data.cid !== event.commit.cid) return; 247 + const rpc = event.did === this.did ? this.rpc : await this.#getClient(event.did); 248 + try { 249 + const data = await this.#getRecord({ 250 + client: rpc, 251 + repo: event.did, 252 + collection: UPDATE_COLLECTION, 253 + rkey: event.commit.rkey, 254 + }); 255 + if (data.cid !== event.commit.cid) return; 256 + } catch { 257 + return; 258 + } 228 259 229 260 Y.applyUpdate(this.#ydoc, decode(event.commit.record.update), this); 230 261 } else if (event.commit.collection === AWARENESS_COLLECTION) { ··· 262 293 this.#ydoc.off("update", this.#onUpdate); 263 294 this.awareness.off("change", this.#onAwarenessChange); 264 295 this.awareness.destroy(); 265 - if (this.#cursorTimer) clearTimeout(this.#cursorTimer); 266 296 this.#ws?.close(); 267 297 } 268 298 }