forked from
tokono.ma/diffuse
A music player that connects to your cloud/distributed storage.
1import { Client, ClientResponseError, ok } from "@atcute/client";
2import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto";
3import { decode, encode } from "@atcute/cbor";
4import { xxh32r } from "xxh32/dist/raw.js";
5import * as IDB from "idb-keyval";
6
7import { computed, signal } from "~/common/signal.js";
8import { BroadcastedOutputElement, outputManager } from "../../common.js";
9import { defineElement } from "~/common/element.js";
10
11import {
12 clearStoredSession,
13 login,
14 logout,
15 OAuthUserAgent,
16 restoreOrFinalize,
17 TokenRefreshError,
18} from "./oauth.js";
19
20/**
21 * @import {Track, TrackBundle} from "~/definitions/types.d.ts"
22 * @import {OutputManager} from "../../types.d.ts"
23 * @import {ATProtoOutputElement} from "./types.d.ts"
24 */
25
26////////////////////////////////////////////
27// ELEMENT
28////////////////////////////////////////////
29
30const WRITE_WINDOW_MS = 3_600_000;
31const WRITE_RATE_LIMIT = 1500;
32const WRITE_IDB_KEY = "diffuse/output/raw/atproto/writes";
33
34/** @type {Set<string>} */
35const WATCHED_COLLECTIONS = new Set([
36 "sh.diffuse.output.facet",
37 "sh.diffuse.output.playlistItem",
38 "sh.diffuse.output.setting",
39 "sh.diffuse.output.trackBundle",
40]);
41
42/**
43 * @implements {ATProtoOutputElement}
44 */
45class ATProtoOutput extends BroadcastedOutputElement {
46 static NAME = "diffuse/output/raw/atproto";
47
48 #manager;
49
50 /** @type {PromiseWithResolvers<void>} */
51 #authenticated = Promise.withResolvers();
52
53 /** @type {Client | null} */
54 #rpc = null;
55
56 /** @type {OAuthUserAgent | null} */
57 #agent = null;
58
59 /** @type {string | null} */
60 #pdsUrl = null;
61
62 /** @type {AsyncIterator<any> | null} */
63 #firehoseIterator = null;
64 #firehoseGen = 0;
65
66 constructor() {
67 super();
68
69 /** @type {Track[] | null} */
70 let lastPersistedTracks = null;
71
72 /** @type {OutputManager} */
73 this.#manager = outputManager({
74 facets: {
75 empty: () => [],
76 get: () => this.listRecords("sh.diffuse.output.facet"),
77 put: (data) => this.putRecords("sh.diffuse.output.facet", data),
78 },
79 playlistItems: {
80 empty: () => [],
81 get: () => this.listRecords("sh.diffuse.output.playlistItem"),
82 put: (data) => this.putRecords("sh.diffuse.output.playlistItem", data),
83 },
84 settings: {
85 empty: () => [],
86 get: () => this.listRecords("sh.diffuse.output.setting"),
87 put: (data) => this.putRecords("sh.diffuse.output.setting", data),
88 },
89 tracks: {
90 empty: () => [],
91 get: async () => {
92 const bundles = await this.listRecords(
93 "sh.diffuse.output.trackBundle",
94 );
95
96 /** @type {Track[]} */
97 const tracks = [];
98
99 for (const bundle of bundles) {
100 if (!bundle.data?.ref?.$link) continue;
101 const bytes = await this.#fetchBlob(bundle.data.ref.$link);
102 tracks.push(...decode(bytes));
103 }
104
105 lastPersistedTracks = tracks;
106 return tracks;
107 },
108 put: async (data) => {
109 const hashCurrent = xxh32r(encode(lastPersistedTracks ?? []));
110 const hashNew = xxh32r(encode(data));
111
112 if (hashCurrent === hashNew) {
113 return;
114 }
115
116 const bytes = encode(data);
117 const blob = await this.#uploadBlob(bytes);
118 const id = xxh32r(bytes).toString(16);
119
120 /** @type {TrackBundle} */
121 const bundle = {
122 $type: "sh.diffuse.output.trackBundle",
123 id,
124 data: blob,
125 };
126
127 await this.putRecords("sh.diffuse.output.trackBundle", [bundle]);
128 lastPersistedTracks = data;
129 },
130 },
131 });
132
133 this.facets = this.#manager.facets;
134 this.playlistItems = this.#manager.playlistItems;
135 this.settings = this.#manager.settings;
136 this.tracks = this.#manager.tracks;
137 }
138
139 // SIGNALS
140
141 #did = signal(/** @type {`did:${string}:${string}` | null} */ (null));
142 #handle = signal(/** @type {string | null} */ (null));
143 #isOnline = signal(navigator.onLine);
144 #rev = signal(/** @type {string | null} */ (null));
145 #revFetchedAt = 0;
146 #ownRevs = new Set();
147 #writing = 0;
148
149 /** @type {Array<{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }>} */
150 #writeQueue = [];
151 #writeDraining = false;
152 /** @type {Map<string, { cancelled: boolean }>} */
153 #writeCancels = new Map();
154
155 did = this.#did.get;
156 handle = this.#handle.get;
157 rev = this.#rev.get;
158
159 ready = computed(() => {
160 return this.#did.value !== null && !!this.#rpc && this.#isOnline.value;
161 });
162
163 // LIFECYCLE
164
165 /** @override */
166 connectedCallback() {
167 this.replicateSavedData(this.#manager);
168
169 super.connectedCallback();
170
171 this.#tryRestore();
172
173 globalThis.addEventListener("online", this.#online);
174 globalThis.addEventListener("offline", this.#offline);
175 }
176
177 /** @override */
178 disconnectedCallback() {
179 this.#stopFirehose();
180 globalThis.removeEventListener("online", this.#online);
181 globalThis.removeEventListener("offline", this.#offline);
182 }
183
184 #offline = () => this.#isOnline.set(false);
185 #online = () => this.#isOnline.set(true);
186
187 // AUTH
188
189 /**
190 * Initiate the OAuth flow.
191 * Navigates the browser to the authorization server.
192 *
193 * @param {string} handle
194 */
195 async login(handle) {
196 await login(handle);
197 }
198
199 /**
200 * Sign out and revoke the current session.
201 */
202 async logout() {
203 if (this.#agent) {
204 this.#stopFirehose();
205 await logout(this.#agent);
206 this.#agent = null;
207 this.#authenticated = Promise.withResolvers();
208 this.#did.value = null;
209 this.#handle.value = null;
210 this.#pdsUrl = null;
211 this.#rpc = null;
212 }
213 }
214
215 /**
216 * Clear session state without contacting the server.
217 * Used when the session has already been revoked.
218 */
219 #clearSession() {
220 this.#stopFirehose();
221 this.#agent = null;
222 this.#authenticated = Promise.withResolvers();
223 this.#did.value = null;
224 this.#handle.value = null;
225 this.#pdsUrl = null;
226 this.#rpc = null;
227
228 clearStoredSession();
229 }
230
231 /**
232 * @param {unknown} err
233 * @returns {boolean}
234 */
235 #isSessionError(err) {
236 if (err instanceof TokenRefreshError) return true;
237 // OAuthUserAgent.handle() swallows TokenRefreshError and returns the
238 // original 401 response, which ok() wraps as a ClientResponseError.
239 if (err instanceof ClientResponseError && err.status === 401) return true;
240 if (err && typeof err === "object" && "cause" in err) {
241 return this.#isSessionError(/** @type {any} */ (err).cause);
242 }
243 return false;
244 }
245
246 async #tryRestore() {
247 await this.whenConnected();
248
249 try {
250 const session = await restoreOrFinalize();
251
252 if (session) {
253 this.#setSession(session);
254 }
255 } catch (err) {
256 if (this.#isSessionError(err)) {
257 this.#clearSession();
258 } else {
259 throw err;
260 }
261 }
262 }
263
264 /**
265 * @param {import("@atcute/oauth-browser-client").Session} session
266 */
267 #setSession(session) {
268 const agent = new OAuthUserAgent(session);
269
270 // Intercept token refresh to detect session revocation proactively.
271 // OAuthUserAgent.handle() swallows TokenRefreshError silently,
272 // so we hook into getSession to clear state as soon as refresh fails.
273 const originalGetSession = agent.getSession.bind(agent);
274 agent.getSession = /** @param {any[]} args */ (...args) => {
275 const promise = originalGetSession(...args);
276
277 promise.catch((err) => {
278 if (err instanceof TokenRefreshError) {
279 this.#clearSession();
280 }
281 });
282
283 return promise;
284 };
285
286 this.#agent = agent;
287 this.#rpc = new Client({ handler: agent });
288 this.#did.value = session.info.sub;
289 this.#pdsUrl = session.info.aud;
290 this.#authenticated.resolve();
291 this.#startFirehose();
292 this.#fetchHandle(session.info.sub);
293 }
294
295 /**
296 * @param {string} did
297 */
298 async #fetchHandle(did) {
299 const rpc = this.#rpc;
300 if (!rpc) return;
301 try {
302 const result = await ok(rpc.get("com.atproto.repo.describeRepo", {
303 params: { repo: /** @type {import("@atcute/lexicons").ActorIdentifier} */ (did) },
304 }));
305 if (this.#did.value === did) {
306 this.#handle.value = result.handle ?? null;
307 }
308 } catch {
309 // Non-fatal; handle stays null
310 }
311 }
312
313 // FIREHOSE
314
315 #stopFirehose() {
316 const iter = this.#firehoseIterator;
317 this.#firehoseIterator = null;
318 iter?.return?.();
319 }
320
321 async #startFirehose() {
322 this.#stopFirehose();
323
324 const gen = ++this.#firehoseGen;
325 const pdsUrl = this.#pdsUrl;
326 if (!pdsUrl) return;
327
328 const wssUrl = pdsUrl.replace(
329 /^https?:\/\//,
330 (m) => m === "https://" ? "wss://" : "ws://",
331 );
332
333 const { FirehoseSubscription } = await import("@atcute/firehose");
334
335 // Abort if superseded while awaiting the import
336 if (this.#firehoseGen !== gen) return;
337
338 const subscription = new FirehoseSubscription({
339 service: wssUrl,
340 nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
341 validateMessages: false,
342 });
343
344 const iter = subscription[Symbol.asyncIterator]();
345 this.#firehoseIterator = iter;
346
347 try {
348 for await (const message of iter) {
349 this.#handleFirehoseCommit(message);
350 }
351 } catch {
352 // Non-fatal; partysocket handles reconnection automatically
353 }
354 }
355
356 /**
357 * @param {any} message
358 */
359 #handleFirehoseCommit(message) {
360 if (message.$type !== "com.atproto.sync.subscribeRepos#commit") return;
361 if (message.repo !== this.#did.value) return;
362
363 // Skip commits we made ourselves (all intermediate revs, not just the last)
364 if (this.#ownRevs.delete(message.rev)) return;
365
366 const touched = new Set(
367 (message.ops ?? [])
368 .map((/** @type {any} */ op) => op.path?.split("/")[0])
369 .filter((/** @type {string} */ c) => WATCHED_COLLECTIONS.has(c)),
370 );
371
372 if (touched.size === 0) return;
373 if (this.#writing > 0) return;
374
375 if (touched.has("sh.diffuse.output.facet")) this.#manager.facets.reload();
376 if (touched.has("sh.diffuse.output.playlistItem")) {
377 this.#manager.playlistItems.reload();
378 }
379 if (touched.has("sh.diffuse.output.setting")) {
380 this.#manager.settings.reload();
381 }
382 if (touched.has("sh.diffuse.output.trackBundle")) {
383 this.#manager.tracks.reload();
384 }
385 }
386
387 // RECORDS
388
389 /**
390 * Fetch the latest commit rev for this repo.
391 * Returns `null` if not authenticated or on error.
392 *
393 * @returns {Promise<string | null>}
394 */
395 async getLatestCommit() {
396 const did = this.#did.value;
397
398 const rpc = this.#rpc;
399 if (!rpc || !did) return null;
400
401 try {
402 const result = await ok(rpc.get(
403 "com.atproto.sync.getLatestCommit",
404 { params: { did } },
405 ));
406
407 this.#rev.value = result?.rev;
408 this.#revFetchedAt = Date.now();
409 return result?.rev;
410 } catch (err) {
411 if (this.#isSessionError(err)) {
412 this.#clearSession();
413 return null;
414 }
415
416 throw err;
417 }
418 }
419
420 /**
421 * @param {Uint8Array} bytes
422 * @returns {Promise<any>}
423 */
424 async #uploadBlob(bytes) {
425 const rpc = this.#rpc;
426 if (!rpc) return;
427 const result = await ok(rpc.post("com.atproto.repo.uploadBlob", {
428 input: bytes,
429 headers: { "content-type": "application/octet-stream" },
430 }));
431 return result.blob;
432 }
433
434 /**
435 * @param {string} cid
436 * @returns {Promise<Uint8Array>}
437 */
438 async #fetchBlob(cid) {
439 const rpc = this.#rpc;
440 const did = this.#did.value;
441 if (!rpc || !did) return new Uint8Array();
442 return await ok(rpc.get("com.atproto.sync.getBlob", {
443 params: { did, cid },
444 as: "bytes",
445 }));
446 }
447
448 /**
449 * @template T
450 * @param {string} collection
451 * @param {string} [did]
452 * @returns {Promise<T[]>}
453 */
454 async listRecords(collection, did) {
455 did ??= this.#did.value ?? undefined;
456
457 if (!this.#rpc || !did) return [];
458
459 try {
460 const records = [];
461 /** @type {string | undefined} */
462 let cursor;
463 do {
464 const page = await ok(this.#rpc.get("com.atproto.repo.listRecords", {
465 params: { repo: /** @type {import("@atcute/lexicons").ActorIdentifier} */ (did), collection: /** @type {`${string}.${string}.${string}`} */ (collection), limit: 100, cursor },
466 }));
467 records.push(...page.records.map((r) => /** @type {T} */ (r.value)));
468 cursor = page.cursor;
469 } while (cursor);
470 return records;
471 } catch (err) {
472 if (this.#isSessionError(err)) {
473 this.#clearSession();
474 return [];
475 }
476
477 throw err;
478 }
479 }
480
481 // WRITE QUEUE
482
483 /** @returns {Promise<{ id: string, ts: number }[]>} */
484 async #loadWriteWindow() {
485 const now = Date.now();
486 const all = /** @type {{ id: string, ts: number }[]} */ (
487 await IDB.get(WRITE_IDB_KEY) ?? []
488 );
489 return all.filter((e) => now - e.ts < WRITE_WINDOW_MS);
490 }
491
492 /** @param {string[]} ids */
493 async #recordWritten(ids) {
494 const now = Date.now();
495 const window = await this.#loadWriteWindow();
496 await IDB.set(WRITE_IDB_KEY, [
497 ...window,
498 ...ids.map((id) => ({ id, ts: now })),
499 ]);
500 }
501
502 /**
503 * @param {() => Promise<void>} fn
504 * @returns {Promise<void>}
505 */
506 #enqueueWrite(fn) {
507 return new Promise((resolve, reject) => {
508 this.#writeQueue.push({ fn, resolve, reject });
509 this.#drainWrites();
510 });
511 }
512
513 async #drainWrites() {
514 if (this.#writeDraining) return;
515 this.#writeDraining = true;
516
517 while (this.#writeQueue.length > 0) {
518 const { fn, resolve, reject } =
519 /** @type {{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }} */ (
520 this.#writeQueue.shift()
521 );
522 try {
523 await fn();
524 resolve();
525 } catch (err) {
526 reject(err);
527 }
528 }
529
530 this.#writeDraining = false;
531 }
532
533 /**
534 * @param {string} collection
535 * @param {Array<{ id: string }>} data
536 * @param {{ deleteBatchSize?: number, upsertBatchSize?: number }} [options]
537 */
538 async putRecords(
539 collection,
540 data,
541 { deleteBatchSize = 100, upsertBatchSize = deleteBatchSize } = {},
542 ) {
543 if (!this.#rpc || !this.#did.value) return;
544
545 // Supersede any prior write for this collection
546 const prior = this.#writeCancels.get(collection);
547 if (prior) prior.cancelled = true;
548
549 const token = { cancelled: false };
550 this.#writeCancels.set(collection, token);
551
552 return this.#enqueueWrite(async () => {
553 if (token.cancelled) return;
554 try {
555 await this.#doPutRecords(collection, data, {
556 deleteBatchSize,
557 upsertBatchSize,
558 }, token);
559 } finally {
560 if (this.#writeCancels.get(collection) === token) {
561 this.#writeCancels.delete(collection);
562 }
563 }
564 });
565 }
566
567 /**
568 * @param {string} collection
569 * @param {Array<{ id: string }>} data
570 * @param {{ deleteBatchSize: number, upsertBatchSize: number }} options
571 * @param {{ cancelled: boolean }} token
572 */
573 async #doPutRecords(
574 collection,
575 data,
576 { deleteBatchSize, upsertBatchSize },
577 token,
578 ) {
579 const rpc = this.#rpc;
580 const did = this.#did.value;
581 if (!rpc || !did) return;
582
583 this.#writing++;
584 try {
585 // 1. Fetch current state
586 /** @type {Map<string, { rkey: string, value: unknown }>} */
587 const existing = new Map();
588
589 /** @type {string | undefined} */
590 let cursor;
591 do {
592 const page = await ok(rpc.get("com.atproto.repo.listRecords", {
593 params: { repo: did, collection: /** @type {`${string}.${string}.${string}`} */ (collection), limit: 100, cursor },
594 }));
595 for (const { uri, value } of page.records) {
596 const record = /** @type {any} */ (value);
597 const rkey = /** @type {string} */ (uri.split("/").at(-1));
598 existing.set(record.id, { rkey, value: record });
599 }
600 cursor = page.cursor;
601 } while (cursor);
602
603 // 2. Build desired state
604 const desired = new Map(
605 data.map((record) => [record.id, { $type: collection, ...record }]),
606 );
607
608 // 3. Compute diff
609 /** @type {unknown[]} */
610 const deletes = [];
611
612 /** @type {unknown[]} */
613 const upserts = [];
614
615 for (const [id, { rkey }] of existing) {
616 if (!desired.has(id)) {
617 deletes.push({
618 $type: "com.atproto.repo.applyWrites#delete",
619 collection,
620 rkey,
621 });
622 }
623 }
624
625 for (const [id, record] of desired) {
626 const entry = existing.get(id);
627
628 if (!entry) {
629 upserts.push({
630 $type: "com.atproto.repo.applyWrites#create",
631 collection,
632 rkey: id,
633 value: record,
634 });
635 } else if (JSON.stringify(entry.value) !== JSON.stringify(record)) {
636 upserts.push({
637 $type: "com.atproto.repo.applyWrites#update",
638 collection,
639 rkey: entry.rkey,
640 value: record,
641 });
642 }
643 }
644
645 // 4. Apply batches, throttled to WRITE_RATE_LIMIT ops/hour.
646 // The write queue ensures we are the only writer, so one precise sleep
647 // is enough — no need to re-check in a loop.
648 const applyBatch = async (/** @type {any[]} */ batch) => {
649 const window = await this.#loadWriteWindow();
650
651 if (window.length + batch.length > WRITE_RATE_LIMIT) {
652 const needed = window.length + batch.length - WRITE_RATE_LIMIT;
653 const sorted = [...window].sort((a, b) => a.ts - b.ts);
654 const waitMs = WRITE_WINDOW_MS -
655 (Date.now() - sorted[needed - 1].ts) + 1;
656 await new Promise((resolve) => setTimeout(resolve, waitMs));
657 }
658
659 const result = await ok(rpc.post("com.atproto.repo.applyWrites", {
660 input: { repo: did, writes: batch },
661 }));
662
663 const writtenIds = batch.map((op) => op.rkey ?? op.value?.id).filter(
664 Boolean,
665 );
666 await this.#recordWritten(writtenIds);
667
668 if (result?.commit?.rev) {
669 this.#rev.value = result.commit.rev;
670 this.#ownRevs.add(result.commit.rev);
671 }
672 };
673
674 for (let i = 0; i < deletes.length; i += deleteBatchSize) {
675 if (token.cancelled) return;
676 await applyBatch(deletes.slice(i, i + deleteBatchSize));
677 }
678
679 for (let i = 0; i < upserts.length; i += upsertBatchSize) {
680 if (token.cancelled) return;
681 await applyBatch(upserts.slice(i, i + upsertBatchSize));
682 }
683 } catch (err) {
684 if (this.#isSessionError(err)) {
685 this.#clearSession();
686 return;
687 }
688
689 throw err;
690 } finally {
691 this.#writing--;
692 }
693 }
694}
695
696export default ATProtoOutput;
697
698////////////////////////////////////////////
699// REGISTER
700////////////////////////////////////////////
701
702export const CLASS = ATProtoOutput;
703export const NAME = "dor-atproto";
704
705defineElement(NAME, ATProtoOutput);