A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v4 705 lines 19 kB view raw
1import { Client, ClientResponseError, ok } from "@atcute/client"; 2import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto"; 3import { decode, encode } from "@atcute/cbor"; 4import { xxh32r } from "xxh32/dist/raw.js"; 5import * as IDB from "idb-keyval"; 6 7import { computed, signal } from "~/common/signal.js"; 8import { BroadcastedOutputElement, outputManager } from "../../common.js"; 9import { defineElement } from "~/common/element.js"; 10 11import { 12 clearStoredSession, 13 login, 14 logout, 15 OAuthUserAgent, 16 restoreOrFinalize, 17 TokenRefreshError, 18} from "./oauth.js"; 19 20/** 21 * @import {Track, TrackBundle} from "~/definitions/types.d.ts" 22 * @import {OutputManager} from "../../types.d.ts" 23 * @import {ATProtoOutputElement} from "./types.d.ts" 24 */ 25 26//////////////////////////////////////////// 27// ELEMENT 28//////////////////////////////////////////// 29 30const WRITE_WINDOW_MS = 3_600_000; 31const WRITE_RATE_LIMIT = 1500; 32const WRITE_IDB_KEY = "diffuse/output/raw/atproto/writes"; 33 34/** @type {Set<string>} */ 35const WATCHED_COLLECTIONS = new Set([ 36 "sh.diffuse.output.facet", 37 "sh.diffuse.output.playlistItem", 38 "sh.diffuse.output.setting", 39 "sh.diffuse.output.trackBundle", 40]); 41 42/** 43 * @implements {ATProtoOutputElement} 44 */ 45class ATProtoOutput extends BroadcastedOutputElement { 46 static NAME = "diffuse/output/raw/atproto"; 47 48 #manager; 49 50 /** @type {PromiseWithResolvers<void>} */ 51 #authenticated = Promise.withResolvers(); 52 53 /** @type {Client | null} */ 54 #rpc = null; 55 56 /** @type {OAuthUserAgent | null} */ 57 #agent = null; 58 59 /** @type {string | null} */ 60 #pdsUrl = null; 61 62 /** @type {AsyncIterator<any> | null} */ 63 #firehoseIterator = null; 64 #firehoseGen = 0; 65 66 constructor() { 67 super(); 68 69 /** @type {Track[] | null} */ 70 let lastPersistedTracks = null; 71 72 /** @type {OutputManager} */ 73 this.#manager = outputManager({ 74 facets: { 75 empty: () => [], 76 get: () => this.listRecords("sh.diffuse.output.facet"), 77 put: (data) => this.putRecords("sh.diffuse.output.facet", data), 78 }, 79 playlistItems: { 80 empty: () => [], 81 get: () => this.listRecords("sh.diffuse.output.playlistItem"), 82 put: (data) => this.putRecords("sh.diffuse.output.playlistItem", data), 83 }, 84 settings: { 85 empty: () => [], 86 get: () => this.listRecords("sh.diffuse.output.setting"), 87 put: (data) => this.putRecords("sh.diffuse.output.setting", data), 88 }, 89 tracks: { 90 empty: () => [], 91 get: async () => { 92 const bundles = await this.listRecords( 93 "sh.diffuse.output.trackBundle", 94 ); 95 96 /** @type {Track[]} */ 97 const tracks = []; 98 99 for (const bundle of bundles) { 100 if (!bundle.data?.ref?.$link) continue; 101 const bytes = await this.#fetchBlob(bundle.data.ref.$link); 102 tracks.push(...decode(bytes)); 103 } 104 105 lastPersistedTracks = tracks; 106 return tracks; 107 }, 108 put: async (data) => { 109 const hashCurrent = xxh32r(encode(lastPersistedTracks ?? [])); 110 const hashNew = xxh32r(encode(data)); 111 112 if (hashCurrent === hashNew) { 113 return; 114 } 115 116 const bytes = encode(data); 117 const blob = await this.#uploadBlob(bytes); 118 const id = xxh32r(bytes).toString(16); 119 120 /** @type {TrackBundle} */ 121 const bundle = { 122 $type: "sh.diffuse.output.trackBundle", 123 id, 124 data: blob, 125 }; 126 127 await this.putRecords("sh.diffuse.output.trackBundle", [bundle]); 128 lastPersistedTracks = data; 129 }, 130 }, 131 }); 132 133 this.facets = this.#manager.facets; 134 this.playlistItems = this.#manager.playlistItems; 135 this.settings = this.#manager.settings; 136 this.tracks = this.#manager.tracks; 137 } 138 139 // SIGNALS 140 141 #did = signal(/** @type {`did:${string}:${string}` | null} */ (null)); 142 #handle = signal(/** @type {string | null} */ (null)); 143 #isOnline = signal(navigator.onLine); 144 #rev = signal(/** @type {string | null} */ (null)); 145 #revFetchedAt = 0; 146 #ownRevs = new Set(); 147 #writing = 0; 148 149 /** @type {Array<{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }>} */ 150 #writeQueue = []; 151 #writeDraining = false; 152 /** @type {Map<string, { cancelled: boolean }>} */ 153 #writeCancels = new Map(); 154 155 did = this.#did.get; 156 handle = this.#handle.get; 157 rev = this.#rev.get; 158 159 ready = computed(() => { 160 return this.#did.value !== null && !!this.#rpc && this.#isOnline.value; 161 }); 162 163 // LIFECYCLE 164 165 /** @override */ 166 connectedCallback() { 167 this.replicateSavedData(this.#manager); 168 169 super.connectedCallback(); 170 171 this.#tryRestore(); 172 173 globalThis.addEventListener("online", this.#online); 174 globalThis.addEventListener("offline", this.#offline); 175 } 176 177 /** @override */ 178 disconnectedCallback() { 179 this.#stopFirehose(); 180 globalThis.removeEventListener("online", this.#online); 181 globalThis.removeEventListener("offline", this.#offline); 182 } 183 184 #offline = () => this.#isOnline.set(false); 185 #online = () => this.#isOnline.set(true); 186 187 // AUTH 188 189 /** 190 * Initiate the OAuth flow. 191 * Navigates the browser to the authorization server. 192 * 193 * @param {string} handle 194 */ 195 async login(handle) { 196 await login(handle); 197 } 198 199 /** 200 * Sign out and revoke the current session. 201 */ 202 async logout() { 203 if (this.#agent) { 204 this.#stopFirehose(); 205 await logout(this.#agent); 206 this.#agent = null; 207 this.#authenticated = Promise.withResolvers(); 208 this.#did.value = null; 209 this.#handle.value = null; 210 this.#pdsUrl = null; 211 this.#rpc = null; 212 } 213 } 214 215 /** 216 * Clear session state without contacting the server. 217 * Used when the session has already been revoked. 218 */ 219 #clearSession() { 220 this.#stopFirehose(); 221 this.#agent = null; 222 this.#authenticated = Promise.withResolvers(); 223 this.#did.value = null; 224 this.#handle.value = null; 225 this.#pdsUrl = null; 226 this.#rpc = null; 227 228 clearStoredSession(); 229 } 230 231 /** 232 * @param {unknown} err 233 * @returns {boolean} 234 */ 235 #isSessionError(err) { 236 if (err instanceof TokenRefreshError) return true; 237 // OAuthUserAgent.handle() swallows TokenRefreshError and returns the 238 // original 401 response, which ok() wraps as a ClientResponseError. 239 if (err instanceof ClientResponseError && err.status === 401) return true; 240 if (err && typeof err === "object" && "cause" in err) { 241 return this.#isSessionError(/** @type {any} */ (err).cause); 242 } 243 return false; 244 } 245 246 async #tryRestore() { 247 await this.whenConnected(); 248 249 try { 250 const session = await restoreOrFinalize(); 251 252 if (session) { 253 this.#setSession(session); 254 } 255 } catch (err) { 256 if (this.#isSessionError(err)) { 257 this.#clearSession(); 258 } else { 259 throw err; 260 } 261 } 262 } 263 264 /** 265 * @param {import("@atcute/oauth-browser-client").Session} session 266 */ 267 #setSession(session) { 268 const agent = new OAuthUserAgent(session); 269 270 // Intercept token refresh to detect session revocation proactively. 271 // OAuthUserAgent.handle() swallows TokenRefreshError silently, 272 // so we hook into getSession to clear state as soon as refresh fails. 273 const originalGetSession = agent.getSession.bind(agent); 274 agent.getSession = /** @param {any[]} args */ (...args) => { 275 const promise = originalGetSession(...args); 276 277 promise.catch((err) => { 278 if (err instanceof TokenRefreshError) { 279 this.#clearSession(); 280 } 281 }); 282 283 return promise; 284 }; 285 286 this.#agent = agent; 287 this.#rpc = new Client({ handler: agent }); 288 this.#did.value = session.info.sub; 289 this.#pdsUrl = session.info.aud; 290 this.#authenticated.resolve(); 291 this.#startFirehose(); 292 this.#fetchHandle(session.info.sub); 293 } 294 295 /** 296 * @param {string} did 297 */ 298 async #fetchHandle(did) { 299 const rpc = this.#rpc; 300 if (!rpc) return; 301 try { 302 const result = await ok(rpc.get("com.atproto.repo.describeRepo", { 303 params: { repo: /** @type {import("@atcute/lexicons").ActorIdentifier} */ (did) }, 304 })); 305 if (this.#did.value === did) { 306 this.#handle.value = result.handle ?? null; 307 } 308 } catch { 309 // Non-fatal; handle stays null 310 } 311 } 312 313 // FIREHOSE 314 315 #stopFirehose() { 316 const iter = this.#firehoseIterator; 317 this.#firehoseIterator = null; 318 iter?.return?.(); 319 } 320 321 async #startFirehose() { 322 this.#stopFirehose(); 323 324 const gen = ++this.#firehoseGen; 325 const pdsUrl = this.#pdsUrl; 326 if (!pdsUrl) return; 327 328 const wssUrl = pdsUrl.replace( 329 /^https?:\/\//, 330 (m) => m === "https://" ? "wss://" : "ws://", 331 ); 332 333 const { FirehoseSubscription } = await import("@atcute/firehose"); 334 335 // Abort if superseded while awaiting the import 336 if (this.#firehoseGen !== gen) return; 337 338 const subscription = new FirehoseSubscription({ 339 service: wssUrl, 340 nsid: ComAtprotoSyncSubscribeRepos.mainSchema, 341 validateMessages: false, 342 }); 343 344 const iter = subscription[Symbol.asyncIterator](); 345 this.#firehoseIterator = iter; 346 347 try { 348 for await (const message of iter) { 349 this.#handleFirehoseCommit(message); 350 } 351 } catch { 352 // Non-fatal; partysocket handles reconnection automatically 353 } 354 } 355 356 /** 357 * @param {any} message 358 */ 359 #handleFirehoseCommit(message) { 360 if (message.$type !== "com.atproto.sync.subscribeRepos#commit") return; 361 if (message.repo !== this.#did.value) return; 362 363 // Skip commits we made ourselves (all intermediate revs, not just the last) 364 if (this.#ownRevs.delete(message.rev)) return; 365 366 const touched = new Set( 367 (message.ops ?? []) 368 .map((/** @type {any} */ op) => op.path?.split("/")[0]) 369 .filter((/** @type {string} */ c) => WATCHED_COLLECTIONS.has(c)), 370 ); 371 372 if (touched.size === 0) return; 373 if (this.#writing > 0) return; 374 375 if (touched.has("sh.diffuse.output.facet")) this.#manager.facets.reload(); 376 if (touched.has("sh.diffuse.output.playlistItem")) { 377 this.#manager.playlistItems.reload(); 378 } 379 if (touched.has("sh.diffuse.output.setting")) { 380 this.#manager.settings.reload(); 381 } 382 if (touched.has("sh.diffuse.output.trackBundle")) { 383 this.#manager.tracks.reload(); 384 } 385 } 386 387 // RECORDS 388 389 /** 390 * Fetch the latest commit rev for this repo. 391 * Returns `null` if not authenticated or on error. 392 * 393 * @returns {Promise<string | null>} 394 */ 395 async getLatestCommit() { 396 const did = this.#did.value; 397 398 const rpc = this.#rpc; 399 if (!rpc || !did) return null; 400 401 try { 402 const result = await ok(rpc.get( 403 "com.atproto.sync.getLatestCommit", 404 { params: { did } }, 405 )); 406 407 this.#rev.value = result?.rev; 408 this.#revFetchedAt = Date.now(); 409 return result?.rev; 410 } catch (err) { 411 if (this.#isSessionError(err)) { 412 this.#clearSession(); 413 return null; 414 } 415 416 throw err; 417 } 418 } 419 420 /** 421 * @param {Uint8Array} bytes 422 * @returns {Promise<any>} 423 */ 424 async #uploadBlob(bytes) { 425 const rpc = this.#rpc; 426 if (!rpc) return; 427 const result = await ok(rpc.post("com.atproto.repo.uploadBlob", { 428 input: bytes, 429 headers: { "content-type": "application/octet-stream" }, 430 })); 431 return result.blob; 432 } 433 434 /** 435 * @param {string} cid 436 * @returns {Promise<Uint8Array>} 437 */ 438 async #fetchBlob(cid) { 439 const rpc = this.#rpc; 440 const did = this.#did.value; 441 if (!rpc || !did) return new Uint8Array(); 442 return await ok(rpc.get("com.atproto.sync.getBlob", { 443 params: { did, cid }, 444 as: "bytes", 445 })); 446 } 447 448 /** 449 * @template T 450 * @param {string} collection 451 * @param {string} [did] 452 * @returns {Promise<T[]>} 453 */ 454 async listRecords(collection, did) { 455 did ??= this.#did.value ?? undefined; 456 457 if (!this.#rpc || !did) return []; 458 459 try { 460 const records = []; 461 /** @type {string | undefined} */ 462 let cursor; 463 do { 464 const page = await ok(this.#rpc.get("com.atproto.repo.listRecords", { 465 params: { repo: /** @type {import("@atcute/lexicons").ActorIdentifier} */ (did), collection: /** @type {`${string}.${string}.${string}`} */ (collection), limit: 100, cursor }, 466 })); 467 records.push(...page.records.map((r) => /** @type {T} */ (r.value))); 468 cursor = page.cursor; 469 } while (cursor); 470 return records; 471 } catch (err) { 472 if (this.#isSessionError(err)) { 473 this.#clearSession(); 474 return []; 475 } 476 477 throw err; 478 } 479 } 480 481 // WRITE QUEUE 482 483 /** @returns {Promise<{ id: string, ts: number }[]>} */ 484 async #loadWriteWindow() { 485 const now = Date.now(); 486 const all = /** @type {{ id: string, ts: number }[]} */ ( 487 await IDB.get(WRITE_IDB_KEY) ?? [] 488 ); 489 return all.filter((e) => now - e.ts < WRITE_WINDOW_MS); 490 } 491 492 /** @param {string[]} ids */ 493 async #recordWritten(ids) { 494 const now = Date.now(); 495 const window = await this.#loadWriteWindow(); 496 await IDB.set(WRITE_IDB_KEY, [ 497 ...window, 498 ...ids.map((id) => ({ id, ts: now })), 499 ]); 500 } 501 502 /** 503 * @param {() => Promise<void>} fn 504 * @returns {Promise<void>} 505 */ 506 #enqueueWrite(fn) { 507 return new Promise((resolve, reject) => { 508 this.#writeQueue.push({ fn, resolve, reject }); 509 this.#drainWrites(); 510 }); 511 } 512 513 async #drainWrites() { 514 if (this.#writeDraining) return; 515 this.#writeDraining = true; 516 517 while (this.#writeQueue.length > 0) { 518 const { fn, resolve, reject } = 519 /** @type {{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }} */ ( 520 this.#writeQueue.shift() 521 ); 522 try { 523 await fn(); 524 resolve(); 525 } catch (err) { 526 reject(err); 527 } 528 } 529 530 this.#writeDraining = false; 531 } 532 533 /** 534 * @param {string} collection 535 * @param {Array<{ id: string }>} data 536 * @param {{ deleteBatchSize?: number, upsertBatchSize?: number }} [options] 537 */ 538 async putRecords( 539 collection, 540 data, 541 { deleteBatchSize = 100, upsertBatchSize = deleteBatchSize } = {}, 542 ) { 543 if (!this.#rpc || !this.#did.value) return; 544 545 // Supersede any prior write for this collection 546 const prior = this.#writeCancels.get(collection); 547 if (prior) prior.cancelled = true; 548 549 const token = { cancelled: false }; 550 this.#writeCancels.set(collection, token); 551 552 return this.#enqueueWrite(async () => { 553 if (token.cancelled) return; 554 try { 555 await this.#doPutRecords(collection, data, { 556 deleteBatchSize, 557 upsertBatchSize, 558 }, token); 559 } finally { 560 if (this.#writeCancels.get(collection) === token) { 561 this.#writeCancels.delete(collection); 562 } 563 } 564 }); 565 } 566 567 /** 568 * @param {string} collection 569 * @param {Array<{ id: string }>} data 570 * @param {{ deleteBatchSize: number, upsertBatchSize: number }} options 571 * @param {{ cancelled: boolean }} token 572 */ 573 async #doPutRecords( 574 collection, 575 data, 576 { deleteBatchSize, upsertBatchSize }, 577 token, 578 ) { 579 const rpc = this.#rpc; 580 const did = this.#did.value; 581 if (!rpc || !did) return; 582 583 this.#writing++; 584 try { 585 // 1. Fetch current state 586 /** @type {Map<string, { rkey: string, value: unknown }>} */ 587 const existing = new Map(); 588 589 /** @type {string | undefined} */ 590 let cursor; 591 do { 592 const page = await ok(rpc.get("com.atproto.repo.listRecords", { 593 params: { repo: did, collection: /** @type {`${string}.${string}.${string}`} */ (collection), limit: 100, cursor }, 594 })); 595 for (const { uri, value } of page.records) { 596 const record = /** @type {any} */ (value); 597 const rkey = /** @type {string} */ (uri.split("/").at(-1)); 598 existing.set(record.id, { rkey, value: record }); 599 } 600 cursor = page.cursor; 601 } while (cursor); 602 603 // 2. Build desired state 604 const desired = new Map( 605 data.map((record) => [record.id, { $type: collection, ...record }]), 606 ); 607 608 // 3. Compute diff 609 /** @type {unknown[]} */ 610 const deletes = []; 611 612 /** @type {unknown[]} */ 613 const upserts = []; 614 615 for (const [id, { rkey }] of existing) { 616 if (!desired.has(id)) { 617 deletes.push({ 618 $type: "com.atproto.repo.applyWrites#delete", 619 collection, 620 rkey, 621 }); 622 } 623 } 624 625 for (const [id, record] of desired) { 626 const entry = existing.get(id); 627 628 if (!entry) { 629 upserts.push({ 630 $type: "com.atproto.repo.applyWrites#create", 631 collection, 632 rkey: id, 633 value: record, 634 }); 635 } else if (JSON.stringify(entry.value) !== JSON.stringify(record)) { 636 upserts.push({ 637 $type: "com.atproto.repo.applyWrites#update", 638 collection, 639 rkey: entry.rkey, 640 value: record, 641 }); 642 } 643 } 644 645 // 4. Apply batches, throttled to WRITE_RATE_LIMIT ops/hour. 646 // The write queue ensures we are the only writer, so one precise sleep 647 // is enough — no need to re-check in a loop. 648 const applyBatch = async (/** @type {any[]} */ batch) => { 649 const window = await this.#loadWriteWindow(); 650 651 if (window.length + batch.length > WRITE_RATE_LIMIT) { 652 const needed = window.length + batch.length - WRITE_RATE_LIMIT; 653 const sorted = [...window].sort((a, b) => a.ts - b.ts); 654 const waitMs = WRITE_WINDOW_MS - 655 (Date.now() - sorted[needed - 1].ts) + 1; 656 await new Promise((resolve) => setTimeout(resolve, waitMs)); 657 } 658 659 const result = await ok(rpc.post("com.atproto.repo.applyWrites", { 660 input: { repo: did, writes: batch }, 661 })); 662 663 const writtenIds = batch.map((op) => op.rkey ?? op.value?.id).filter( 664 Boolean, 665 ); 666 await this.#recordWritten(writtenIds); 667 668 if (result?.commit?.rev) { 669 this.#rev.value = result.commit.rev; 670 this.#ownRevs.add(result.commit.rev); 671 } 672 }; 673 674 for (let i = 0; i < deletes.length; i += deleteBatchSize) { 675 if (token.cancelled) return; 676 await applyBatch(deletes.slice(i, i + deleteBatchSize)); 677 } 678 679 for (let i = 0; i < upserts.length; i += upsertBatchSize) { 680 if (token.cancelled) return; 681 await applyBatch(upserts.slice(i, i + upsertBatchSize)); 682 } 683 } catch (err) { 684 if (this.#isSessionError(err)) { 685 this.#clearSession(); 686 return; 687 } 688 689 throw err; 690 } finally { 691 this.#writing--; 692 } 693 } 694} 695 696export default ATProtoOutput; 697 698//////////////////////////////////////////// 699// REGISTER 700//////////////////////////////////////////// 701 702export const CLASS = ATProtoOutput; 703export const NAME = "dor-atproto"; 704 705defineElement(NAME, ATProtoOutput);