a proof of concept realtime collaborative text editor using atproto as a sync server jake.tngl.io/y-pds/
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Live updates

+39 -5
+3 -3
example/client-metadata.json
··· 1 1 { 2 - "client_id": "https://1470-66-108-106-210.ngrok-free.app/client-metadata.json", 3 - "client_uri": "https://1470-66-108-106-210.ngrok-free.app", 4 - "redirect_uris": ["https://1470-66-108-106-210.ngrok-free.app/callback.html"], 2 + "client_id": "https://dc1b-66-108-106-210.ngrok-free.app/client-metadata.json", 3 + "client_uri": "https://dc1b-66-108-106-210.ngrok-free.app", 4 + "redirect_uris": ["https://dc1b-66-108-106-210.ngrok-free.app/callback.html"], 5 5 "application_type": "native", 6 6 "client_name": "atrtc demo", 7 7 "dpop_bound_access_tokens": true,
+36 -2
example/y-pds.js
··· 24 24 return `at://${did}/${DOC_COLLECTION}/${crypto.randomUUID()}`; 25 25 } 26 26 27 + const JETSTREAM = "wss://jetstream2.us-east.bsky.network/subscribe"; 28 + 27 29 export class YPdsProvider { 28 30 /** @type {Y.Doc} */ 29 31 #ydoc; 30 32 31 33 #ownerDid = ""; 32 34 #rkey = ""; 35 + 36 + /** @type {WebSocket | null} */ 37 + #ws = null; 33 38 34 39 /** 35 40 * @param {Y.Doc} ydoc ··· 61 66 }); 62 67 63 68 this.#ydoc.on("update", this.#onUpdate); 69 + this.#subscribe(new Set(repos)); 64 70 } 65 71 66 72 async #fetchUpdates(repo) { ··· 79 85 return records.filter(r => r.value.docId === this.#rkey); 80 86 } 81 87 82 - /** @param {Uint8Array} update */ 83 - #onUpdate = async update => { 88 + /** @param {Uint8Array} update @param {unknown} origin */ 89 + #onUpdate = async (update, origin) => { 90 + if (origin === this) return; 84 91 await this.rpc.post("com.atproto.repo.createRecord", { 85 92 params: {}, 86 93 input: { ··· 139 146 }); 140 147 } 141 148 149 + /** @param {Set<string>} repos */ 150 + #subscribe(repos) { 151 + const url = new URL(JETSTREAM); 152 + url.searchParams.append("wantedCollections", COLLECTION); 153 + for (const repo of repos) url.searchParams.append("wantedDids", repo); 154 + 155 + this.#ws = new WebSocket(url); 156 + this.#ws.onmessage = async e => { 157 + const event = JSON.parse(e.data); 158 + if (event.kind !== "commit") return; 159 + if (event.commit.operation !== "create") return; 160 + if (event.commit.collection !== COLLECTION) return; 161 + if (!repos.has(event.did)) return; 162 + if (event.commit.record.docId !== this.#rkey) return; 163 + 164 + // verify: re-fetch the record and confirm the CID matches 165 + const rpc = event.did === this.did ? this.rpc : await clientForDid(event.did); 166 + const res = await rpc.get("com.atproto.repo.getRecord", { 167 + params: { repo: event.did, collection: COLLECTION, rkey: event.commit.rkey }, 168 + }); 169 + if (!res.ok || res.data.cid !== event.commit.cid) return; 170 + 171 + Y.applyUpdate(this.#ydoc, decode(event.commit.record.update), this); 172 + }; 173 + } 174 + 142 175 destroy() { 143 176 this.#ydoc.off("update", this.#onUpdate); 177 + this.#ws?.close(); 144 178 } 145 179 }