A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: better track syncing for atproto

+70 -61
+64 -22
src/components/output/raw/atproto/element.js
··· 1 1 import { Client, ClientResponseError, ok } from "@atcute/client"; 2 2 import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto"; 3 - import { encode } from "@atcute/cbor"; 3 + import { decode, encode } from "@atcute/cbor"; 4 4 import { xxh32r } from "xxh32/dist/raw.js"; 5 5 import * as Repo from "@atcute/repo"; 6 6 import * as IDB from "idb-keyval"; ··· 94 94 "sh.diffuse.output.trackBundle", 95 95 ); 96 96 97 - const tracks = bundles.flatMap((bundle) => bundle.tracks ?? []); 97 + /** @type {Track[]} */ 98 + const tracks = []; 99 + 100 + for (const bundle of bundles) { 101 + if (!bundle.data?.ref?.$link) continue; 102 + const bytes = await this.#fetchBlob(bundle.data.ref.$link); 103 + tracks.push(...decode(bytes)); 104 + } 105 + 98 106 lastPersistedTracks = tracks; 99 - 100 107 return tracks; 101 108 }, 102 109 put: async (data) => { ··· 107 114 return; 108 115 } 109 116 110 - /** @type {TrackBundle[]} */ 111 - const bundles = []; 117 + const bytes = encode(data); 118 + const blob = await this.#uploadBlob(bytes); 119 + const id = xxh32r(bytes).toString(16); 112 120 113 - for (let i = 0; i < data.length; i += 100) { 114 - const chunk = data.slice(i, i + 100); 115 - bundles.push({ 116 - $type: "sh.diffuse.output.trackBundle", 117 - id: xxh32r(encode(chunk)).toString(16), 118 - tracks: chunk, 119 - }); 120 - } 121 + /** @type {TrackBundle} */ 122 + const bundle = { 123 + $type: "sh.diffuse.output.trackBundle", 124 + id, 125 + data: blob, 126 + }; 121 127 122 - await this.putRecords("sh.diffuse.output.trackBundle", bundles, { 123 - upsertBatchSize: 1, 124 - }); 128 + await this.putRecords("sh.diffuse.output.trackBundle", [bundle]); 125 129 126 130 lastPersistedTracks = data; 127 131 }, ··· 429 433 } 430 434 431 435 /** 436 + * @param {Uint8Array} bytes 437 + * @returns {Promise<any>} 438 + */ 439 + async #uploadBlob(bytes) { 440 + const rpc = this.#rpc; 441 + if (!rpc) return; 442 + const result = await ok(rpc.post("com.atproto.repo.uploadBlob", { 443 + input: bytes, 444 + headers: { "content-type": "application/octet-stream" }, 445 + })); 446 + return result.blob; 447 + } 448 + 449 + /** 450 + * @param {string} cid 451 + * @returns {Promise<Uint8Array>} 452 + */ 453 + async #fetchBlob(cid) { 454 + const rpc = this.#rpc; 455 + const did = this.#did.value; 456 + if (!rpc || !did) return new Uint8Array(); 457 + return await ok(rpc.get("com.atproto.sync.getBlob", { 458 + params: { did, cid }, 459 + as: "bytes", 460 + })); 461 + } 462 + 463 + /** 432 464 * @template T 433 465 * @param {string} collection 434 466 * @param {string} [did] ··· 497 529 this.#writeDraining = true; 498 530 499 531 while (this.#writeQueue.length > 0) { 500 - const { fn, resolve, reject } = /** @type {{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }} */ ( 501 - this.#writeQueue.shift() 502 - ); 532 + const { fn, resolve, reject } = 533 + /** @type {{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }} */ ( 534 + this.#writeQueue.shift() 535 + ); 503 536 try { 504 537 await fn(); 505 538 resolve(); ··· 533 566 return this.#enqueueWrite(async () => { 534 567 if (token.cancelled) return; 535 568 try { 536 - await this.#doPutRecords(collection, data, { deleteBatchSize, upsertBatchSize }, token); 569 + await this.#doPutRecords(collection, data, { 570 + deleteBatchSize, 571 + upsertBatchSize, 572 + }, token); 537 573 } finally { 538 574 if (this.#writeCancels.get(collection) === token) { 539 575 this.#writeCancels.delete(collection); ··· 548 584 * @param {{ deleteBatchSize: number, upsertBatchSize: number }} options 549 585 * @param {{ cancelled: boolean }} token 550 586 */ 551 - async #doPutRecords(collection, data, { deleteBatchSize, upsertBatchSize }, token) { 587 + async #doPutRecords( 588 + collection, 589 + data, 590 + { deleteBatchSize, upsertBatchSize }, 591 + token, 592 + ) { 552 593 const rpc = this.#rpc; 553 594 const did = this.#did.value; 554 595 if (!rpc || !did) return; ··· 620 661 if (window.length + batch.length > WRITE_RATE_LIMIT) { 621 662 const needed = window.length + batch.length - WRITE_RATE_LIMIT; 622 663 const sorted = [...window].sort((a, b) => a.ts - b.ts); 623 - const waitMs = WRITE_WINDOW_MS - (Date.now() - sorted[needed - 1].ts) + 1; 664 + const waitMs = WRITE_WINDOW_MS - 665 + (Date.now() - sorted[needed - 1].ts) + 1; 624 666 await new Promise((resolve) => setTimeout(resolve, waitMs)); 625 667 } 626 668
+1 -1
src/components/output/raw/atproto/oauth-client-metadata.json
··· 3 3 "client_name": "Diffuse", 4 4 "client_uri": "https://elements.diffuse.sh", 5 5 "redirect_uris": ["https://elements.diffuse.sh/oauth/callback"], 6 - "scope": "atproto repo?collection=sh.diffuse.output.facet&collection=sh.diffuse.output.playlistItem&collection=sh.diffuse.output.playlistItemBundle&collection=sh.diffuse.output.setting&collection=sh.diffuse.output.track&collection=sh.diffuse.output.trackBundle", 6 + "scope": "atproto blob:application/octet-stream repo?collection=sh.diffuse.output.facet&collection=sh.diffuse.output.playlistItem&collection=sh.diffuse.output.setting&collection=sh.diffuse.output.track&collection=sh.diffuse.output.trackBundle", 7 7 "grant_types": ["authorization_code", "refresh_token"], 8 8 "response_types": ["code"], 9 9 "token_endpoint_auth_method": "none",
-1
src/definitions/index.ts
··· 1 1 export * as ShDiffuseOutputCollaboration from "./types/sh/diffuse/output/collaboration.ts"; 2 2 export * as ShDiffuseOutputFacet from "./types/sh/diffuse/output/facet.ts"; 3 3 export * as ShDiffuseOutputPlaylistItem from "./types/sh/diffuse/output/playlistItem.ts"; 4 - export * as ShDiffuseOutputPlaylistItemBundle from "./types/sh/diffuse/output/playlistItemBundle.ts"; 5 4 export * as ShDiffuseOutputSetting from "./types/sh/diffuse/output/setting.ts"; 6 5 export * as ShDiffuseOutputTrack from "./types/sh/diffuse/output/track.ts"; 7 6 export * as ShDiffuseOutputTrackBundle from "./types/sh/diffuse/output/trackBundle.ts";
-23
src/definitions/output/playlistItemBundle.json
··· 1 - { 2 - "lexicon": 1, 3 - "id": "sh.diffuse.output.playlistItemBundle", 4 - "defs": { 5 - "main": { 6 - "type": "record", 7 - "record": { 8 - "type": "object", 9 - "required": ["id", "playlistItems"], 10 - "properties": { 11 - "id": { "type": "string" }, 12 - "createdAt": { "type": "string", "format": "datetime" }, 13 - "playlistItems": { 14 - "type": "array", 15 - "description": "A bundle of playlist items", 16 - "items": { "type": "ref", "ref": "sh.diffuse.output.playlistItem" } 17 - }, 18 - "updatedAt": { "type": "string", "format": "datetime" } 19 - } 20 - } 21 - } 22 - } 23 - }
+5 -5
src/definitions/output/trackBundle.json
··· 6 6 "type": "record", 7 7 "record": { 8 8 "type": "object", 9 - "required": ["id", "tracks"], 9 + "required": ["id", "data"], 10 10 "properties": { 11 11 "id": { "type": "string" }, 12 12 "createdAt": { "type": "string", "format": "datetime" }, 13 - "tracks": { 14 - "type": "array", 15 - "description": "A bundle of tracks", 16 - "items": { "type": "ref", "ref": "sh.diffuse.output.track" } 13 + "data": { 14 + "type": "blob", 15 + "description": "CBOR-encoded tracks", 16 + "accept": ["application/octet-stream"] 17 17 }, 18 18 "updatedAt": { "type": "string", "format": "datetime" } 19 19 }
-4
src/definitions/types.d.ts
··· 10 10 Transformation, 11 11 } from "./types/sh/diffuse/output/playlistItem.ts"; 12 12 13 - export type { 14 - Main as PlaylistItemBundle, 15 - } from "./types/sh/diffuse/output/playlistItemBundle.ts"; 16 - 17 13 export type { Main as Theme } from "./types/sh/diffuse/output/theme.ts"; 18 14 19 15 export type { Main as Setting } from "./types/sh/diffuse/output/setting.ts";
-4
src/elements.vto
··· 200 200 desc: > 201 201 Represents a single item in a playlist. Tracks are matched based on the given criteria. A playlist is formed by grouping items by their playlist property. 202 202 url: "definitions/output/playlistItem.json" 203 - - title: "Output / Playlist Item Bundle" 204 - desc: > 205 - A bundle of playlist items. 206 - url: "definitions/output/playlistItemBundle.json" 207 203 - title: "Output / Progress" 208 204 desc: > 209 205 Used to track progress of (long) audio playback.
-1
tasks/replace-gen-import-extensions.ts
··· 9 9 } 10 10 11 11 replace("./src/definitions/index.ts"); 12 - replace("./src/definitions/types/sh/diffuse/output/playlistItemBundle.ts"); 13 12 replace("./src/definitions/types/sh/diffuse/output/trackBundle.ts");