a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Simplify y-pds.js EVEN MORE

+102 -96
+102 -96
y-pds.js
··· 9 9 const encode = (/** @type {Uint8Array} */ update) => btoa(String.fromCharCode(...update)); 10 10 const decode = (/** @type {string} */ b64) => Uint8Array.from(atob(b64), c => c.charCodeAt(0)); 11 11 12 - const JETSTREAM = "wss://jetstream2.us-east.bsky.network/subscribe"; 12 + export class YPdsProvider { 13 + /** @type {Y.Doc} */ 14 + #ydoc; 13 15 14 - export class YPdsProvider { 15 16 /** @type {Map<string, Client>} */ 16 17 #clients = new Map(); 17 18 ··· 20 21 let client = this.#clients.get(did); 21 22 if (client) return client; 22 23 24 + // fetch the did document 23 25 const url = did.startsWith("did:web:") 24 - ? `https://${did.slice("did:web:".length)}/.well-known/did.json` 25 - : `https://plc.directory/${did}`; 26 - 27 - const res = await fetch(url), 26 + ? `https://${did.slice("did:web:".length)}/.well-known/did.json` 27 + : `https://plc.directory/${did}`, 28 + res = await fetch(url), 28 29 doc = await res.json(); 29 30 31 + // find the PDS URL 30 32 const pds = doc.service?.find(s => s.type === "AtprotoPersonalDataServer")?.serviceEndpoint; 31 33 if (!pds) throw new Error(`no PDS found for ${did}`); 32 34 35 + // create a client for that PDS 33 36 client = new Client({ handler: simpleFetchHandler({ service: pds }) }); 34 37 35 38 this.#clients.set(did, client); 36 39 return client; 37 40 } 38 41 39 - /** @type {Y.Doc} */ 40 - #ydoc; 41 - 42 - #ownerDid = ""; 43 - #rkey = ""; 44 - 45 - /** @type {WebSocket | null} */ 46 - #ws = null; 47 - 48 - #jetstream = JETSTREAM; 49 - 50 - /** @type {{ editors: string[], [key: string]: unknown } | null} */ 51 - #doc = null; 52 - 53 - #cursor = 0; 54 - #destroyed = false; 55 - 56 - /** 57 - * @param {Y.Doc} ydoc 58 - * @param {string} atUri at://ownerDid/collection/rkey 59 - * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string }} options 60 - */ 61 - constructor(ydoc, atUri, { rpc, did, awareness, jetstream = JETSTREAM }) { 62 - this.#clients.set(did, rpc); 63 - 64 - this.#ydoc = ydoc; 65 - const [ownerDid, , rkey] = atUri.slice("at://".length).split("/"); 66 - this.#ownerDid = ownerDid; 67 - this.#rkey = rkey; 68 - this.did = did; 69 - this.awareness = awareness ?? new Awareness(ydoc); 70 - this.#jetstream = jetstream; 71 - 72 - this.awareness.on("change", this.#onAwarenessChange); 73 - this.#ydoc.on("update", this.#onUpdate); 74 - } 75 - 76 42 /** 77 43 * @params {object} options 78 44 * @params {string} [options.rkey] ··· 126 92 return res.data; 127 93 } 128 94 95 + #ownerDid = ""; 96 + #rkey = ""; 97 + 98 + /** @type {WebSocket | null} */ 99 + #ws = null; 100 + 101 + #jetstream = "wss://jetstream2.us-east.bsky.network/subscribe"; 102 + 103 + /** @type {{ editors: string[], [key: string]: unknown } | null} */ 104 + #doc = null; 105 + 106 + #cursor = 0; 107 + #destroyed = false; 108 + 109 + /** 110 + * @param {Y.Doc} ydoc 111 + * @param {string} atUri at://ownerDid/collection/rkey 112 + * @param {{ rpc: Client, did: string, awareness?: Awareness, jetstream?: string }} options 113 + */ 114 + constructor(ydoc, atUri, { rpc, did, awareness, jetstream }) { 115 + this.#clients.set(did, rpc); 116 + this.#ydoc = ydoc; 117 + 118 + const [ownerDid, , rkey] = atUri.slice("at://".length).split("/"); 119 + this.#ownerDid = ownerDid; 120 + this.#rkey = rkey; 121 + this.did = did; 122 + this.awareness = awareness ?? new Awareness(ydoc); 123 + this.#jetstream = jetstream ?? this.#jetstream; 124 + 125 + this.awareness.on("change", this.#onAwarenessChange); 126 + this.#ydoc.on("update", this.#onUpdate); 127 + } 128 + 129 129 async load() { 130 - this.#doc = await this.#ensureDocument(); 130 + try { 131 + const data = await this.#getRecord({ 132 + repo: this.#ownerDid, 133 + collection: DOC_COLLECTION, 134 + rkey: this.#rkey, 135 + }); 136 + this.#doc = data.value; 137 + } catch { 138 + // if the fetch fails and the document is not in our own repo, 139 + // throw an error rather than attempting to create it 140 + if (this.did !== this.#ownerDid) throw new Error("Document not found"); 131 141 142 + // create the document 143 + this.#doc = { docId: this.#rkey, editors: [], createdAt: new Date().toISOString() }; 144 + await this.#upsertRecord({ 145 + repo: this.did, 146 + collection: DOC_COLLECTION, 147 + rkey: this.#rkey, 148 + record: this.#doc, 149 + }); 150 + } 151 + 152 + // fetch updates from the owner and all editors 132 153 const repos = [this.#ownerDid].concat(this.#doc.editors); 133 154 const updates = await Promise.all(repos.map(repo => this.#fetchUpdates(repo))); 134 155 135 - const sorted = updates 136 - .flat() 137 - .sort((a, b) => a.value.createdAt.localeCompare(b.value.createdAt)); 156 + // apply all the updates 138 157 Y.transact(this.#ydoc, () => { 139 - for (const { value } of sorted) { 158 + for (const { value } of updates.flat()) { 140 159 Y.applyUpdate(this.#ydoc, decode(value.update)); 141 160 } 142 161 }); ··· 185 204 }); 186 205 }; 187 206 188 - async #ensureDocument() { 189 - try { 190 - const data = await this.#getRecord({ 191 - repo: this.#ownerDid, 192 - collection: DOC_COLLECTION, 193 - rkey: this.#rkey, 194 - }); 195 - return data.value; 196 - } catch { 197 - if (this.did !== this.#ownerDid) throw new Error("document not found"); 198 - } 199 - 200 - const record = { docId: this.#rkey, editors: [], createdAt: new Date().toISOString() }; 201 - await this.#upsertRecord({ 202 - repo: this.did, 203 - collection: DOC_COLLECTION, 204 - rkey: this.#rkey, 205 - record, 206 - }); 207 - return record; 208 - } 209 - 210 207 getMembers() { 211 208 return this.#doc.editors; 212 209 } 213 210 214 211 async setMembers(editors) { 212 + this.#doc = { ...this.#doc, editors }; 215 213 await this.#upsertRecord({ 216 214 repo: this.did, 217 215 collection: DOC_COLLECTION, 218 216 rkey: this.#rkey, 219 - record: { ...this.#doc, editors }, 217 + record: this.#doc, 220 218 }); 221 - this.#doc = { ...this.#doc, editors }; 222 219 } 223 220 224 221 #subscribe() { ··· 238 235 if (!this.#destroyed) this.#subscribe(); 239 236 }, 1000); 240 237 }; 238 + 241 239 this.#ws.onmessage = async e => { 242 240 const event = JSON.parse(e.data); 243 241 this.#cursor = event.time_us; 244 242 if (event.kind !== "commit") return; 245 243 246 - if (event.commit.collection === DOC_COLLECTION) { 247 - if (event.did !== this.#ownerDid || event.commit.rkey !== this.#rkey) return; 248 - await this.load(); 249 - return; 250 - } 244 + switch (event.commit.collection) { 245 + case DOC_COLLECTION: { 246 + if (event.did !== this.#ownerDid || event.commit.rkey !== this.#rkey) return; 247 + await this.load(); 248 + break; 249 + } 250 + 251 + case UPDATE_COLLECTION: { 252 + if (event.commit.record.docId !== this.#rkey) return; 253 + if (event.did !== this.#ownerDid && !this.#doc.editors.includes(event.did)) return; 251 254 252 - if (event.did !== this.#ownerDid && !this.#doc.editors.includes(event.did)) return; 255 + if (event.commit.operation !== "create") return; 253 256 254 - if (event.commit.collection === UPDATE_COLLECTION) { 255 - if (event.commit.operation !== "create") return; 256 - if (event.commit.record.docId !== this.#rkey) return; 257 + // verify: re-fetch the record and confirm the CID matches 258 + try { 259 + const data = await this.#getRecord({ 260 + repo: event.did, 261 + collection: UPDATE_COLLECTION, 262 + rkey: event.commit.rkey, 263 + }); 264 + if (data.cid !== event.commit.cid) return; 265 + } catch { 266 + return; 267 + } 257 268 258 - // verify: re-fetch the record and confirm the CID matches 259 - try { 260 - const data = await this.#getRecord({ 261 - repo: event.did, 262 - collection: UPDATE_COLLECTION, 263 - rkey: event.commit.rkey, 264 - }); 265 - if (data.cid !== event.commit.cid) return; 266 - } catch { 267 - return; 269 + Y.applyUpdate(this.#ydoc, decode(event.commit.record.update), this); 270 + break; 268 271 } 269 272 270 - Y.applyUpdate(this.#ydoc, decode(event.commit.record.update), this); 271 - } else if (event.commit.collection === AWARENESS_COLLECTION) { 272 - if (event.did === this.did) return; 273 - if (event.commit.record.docId !== this.#rkey) return; 274 - applyAwarenessUpdate(this.awareness, decode(event.commit.record.awareness), "remote"); 273 + case AWARENESS_COLLECTION: { 274 + if (event.commit.record.docId !== this.#rkey) return; 275 + if (event.did !== this.#ownerDid && !this.#doc.editors.includes(event.did)) return; 276 + 277 + if (event.did === this.did) return; 278 + applyAwarenessUpdate(this.awareness, decode(event.commit.record.awareness), "remote"); 279 + break; 280 + } 275 281 } 276 282 }; 277 283 }