A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: atproto, listen to firehose for changes

+101 -3
+2
deno.jsonc
··· 4 4 "vendor": true, 5 5 "imports": { 6 6 "98.css": "npm:98.css@^0.1.21", 7 + "@atcute/atproto": "npm:@atcute/atproto@^3.1.10", 7 8 "@atcute/car": "npm:@atcute/car@^5.1.1", 8 9 "@atcute/cbor": "npm:@atcute/cbor@^2.3.2", 9 10 "@atcute/cid": "npm:@atcute/cid@^2.4.1", 10 11 "@atcute/tid": "npm:@atcute/tid@^1.1.2", 11 12 "@atcute/client": "npm:@atcute/client@^4.2.1", 13 + "@atcute/firehose": "npm:@atcute/firehose@^0.1.0", 12 14 "@atcute/repo": "npm:@atcute/repo@^0.1.3", 13 15 "@atcute/identity-resolver": "npm:@atcute/identity-resolver@^1.2.2", 14 16 "@atcute/lexicons": "npm:@atcute/lexicons@^1.2.9",
+99 -3
src/components/output/raw/atproto/element.js
··· 1 1 import { Client, ClientResponseError, ok } from "@atcute/client"; 2 + import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto"; 2 3 import { encode } from "@atcute/cbor"; 3 4 import { xxh32r } from "xxh32/dist/raw.js"; 4 5 import * as Repo from "@atcute/repo"; ··· 28 29 // ELEMENT 29 30 //////////////////////////////////////////// 30 31 32 + /** @type {Set<string>} */ 33 + const WATCHED_COLLECTIONS = new Set([ 34 + "sh.diffuse.output.facet", 35 + "sh.diffuse.output.playlistItem", 36 + "sh.diffuse.output.theme", 37 + "sh.diffuse.output.trackBundle", 38 + ]); 39 + 31 40 /** 32 41 * @implements {ATProtoOutputElement} 33 42 */ ··· 45 54 /** @type {OAuthUserAgent | null} */ 46 55 #agent = null; 47 56 57 + /** @type {string | null} */ 58 + #pdsUrl = null; 59 + 60 + /** @type {AsyncIterator<any> | null} */ 61 + #firehoseIterator = null; 62 + #firehoseGen = 0; 63 + 48 64 constructor() { 49 65 super(); 50 66 ··· 109 125 110 126 // SIGNALS 111 127 112 - #did = signal(/** @type {string | null} */ (null)); 128 + #did = signal(/** @type {`did:${string}:${string}` | null} */ (null)); 113 129 #isOnline = signal(navigator.onLine); 114 130 #rev = signal(/** @type {string | null} */ (null)); 115 131 ··· 136 152 137 153 /** @override */ 138 154 disconnectedCallback() { 155 + this.#stopFirehose(); 139 156 globalThis.removeEventListener("online", this.#online); 140 157 globalThis.removeEventListener("offline", this.#offline); 141 158 } ··· 160 177 */ 161 178 async logout() { 162 179 if (this.#agent) { 180 + this.#stopFirehose(); 163 181 await logout(this.#agent); 164 182 this.#agent = null; 165 183 this.#authenticated = Promise.withResolvers(); 166 184 this.#did.value = null; 185 + this.#pdsUrl = null; 167 186 this.#rpc = null; 168 187 } 169 188 } ··· 173 192 * Used when the session has already been revoked. 174 193 */ 175 194 #clearSession() { 195 + this.#stopFirehose(); 176 196 this.#agent = null; 177 197 this.#authenticated = Promise.withResolvers(); 178 198 this.#did.value = null; 199 + this.#pdsUrl = null; 179 200 this.#rpc = null; 180 201 181 202 clearStoredSession(); ··· 239 260 this.#agent = agent; 240 261 this.#rpc = new Client({ handler: agent }); 241 262 this.#did.value = session.info.sub; 263 + this.#pdsUrl = session.info.aud; 242 264 this.#authenticated.resolve(); 265 + this.#startFirehose(); 266 + } 267 + 268 + // FIREHOSE 269 + 270 + #stopFirehose() { 271 + const iter = this.#firehoseIterator; 272 + this.#firehoseIterator = null; 273 + iter?.return?.(); 274 + } 275 + 276 + async #startFirehose() { 277 + this.#stopFirehose(); 278 + 279 + const gen = ++this.#firehoseGen; 280 + const pdsUrl = this.#pdsUrl; 281 + if (!pdsUrl) return; 282 + 283 + const wssUrl = pdsUrl.replace( 284 + /^https?:\/\//, 285 + (m) => m === "https://" ? "wss://" : "ws://", 286 + ); 287 + 288 + const { FirehoseSubscription } = await import("@atcute/firehose"); 289 + 290 + // Abort if superseded while awaiting the import 291 + if (this.#firehoseGen !== gen) return; 292 + 293 + const subscription = new FirehoseSubscription({ 294 + service: wssUrl, 295 + nsid: ComAtprotoSyncSubscribeRepos.mainSchema, 296 + validateMessages: false, 297 + }); 298 + 299 + const iter = subscription[Symbol.asyncIterator](); 300 + this.#firehoseIterator = iter; 301 + 302 + try { 303 + for await (const message of iter) { 304 + this.#handleFirehoseCommit(message); 305 + } 306 + } catch { 307 + // Non-fatal; partysocket handles reconnection automatically 308 + } 309 + } 310 + 311 + /** 312 + * @param {any} message 313 + */ 314 + #handleFirehoseCommit(message) { 315 + if (message.$type !== "com.atproto.sync.subscribeRepos#commit") return; 316 + if (message.repo !== this.#did.value) return; 317 + 318 + // Skip commits we made ourselves (rev already reflects current state) 319 + if (message.rev === this.#rev.value) return; 320 + 321 + const touched = new Set( 322 + (message.ops ?? []) 323 + .map((/** @type {any} */ op) => op.path?.split("/")[0]) 324 + .filter((/** @type {string} */ c) => WATCHED_COLLECTIONS.has(c)), 325 + ); 326 + 327 + if (touched.size === 0) return; 328 + 329 + if (touched.has("sh.diffuse.output.facet")) this.#manager.facets.reload(); 330 + if (touched.has("sh.diffuse.output.playlistItem")) { 331 + this.#manager.playlistItems.reload(); 332 + } 333 + if (touched.has("sh.diffuse.output.theme")) this.#manager.themes.reload(); 334 + if (touched.has("sh.diffuse.output.trackBundle")) { 335 + this.#manager.tracks.reload(); 336 + } 243 337 } 244 338 245 339 // RECORDS ··· 350 444 { deleteBatchSize = 100, upsertBatchSize = deleteBatchSize } = {}, 351 445 ) { 352 446 const rpc = this.#rpc; 353 - if (!rpc || !this.#did.value) return; 447 + const did = this.#did.value; 448 + 449 + if (!rpc || !did) return; 354 450 355 451 try { 356 452 // 1. Fetch current state ··· 465 561 } 466 562 467 563 const result = await ok(rpc.post("com.atproto.repo.applyWrites", { 468 - input: { repo: this.#did.value, writes: batch }, 564 + input: { repo: did, writes: batch }, 469 565 })); 470 566 471 567 const writtenIds = batch.map((op) => op.rkey ?? op.value?.id).filter(