A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: process tracks in a worker (#448)

* wip: process worker

* fix: ProxiedActions type

* feat: process tracks in a worker

authored by

Steven Vandevelde and committed by
GitHub
571894d4 d1e5141f

+370 -95
+12
src/common/constants.js
··· 1 + /** 2 + * @import {InputActions} from "./types.d.ts" 3 + */ 4 + 5 + /** @type {Array<keyof InputActions>} */ 6 + export const INPUT_ACTIONS = [ 7 + "consult", 8 + "contextualize", 9 + "groupConsult", 10 + "list", 11 + "resolve", 12 + ];
+19 -5
src/common/types.d.ts
··· 1 - import type { Track } from "../definitions/types.d.ts"; 1 + import type { Track } from "@definitions/types.d.ts"; 2 + import type { 3 + PortProviderMethod, 4 + ProxiedActions, 5 + ProxyProviderMethod, 6 + WorkerProviderMethod, 7 + } from "./worker.d.ts"; 2 8 3 9 // RE-EXPORT 4 10 5 - export type { Track } from "../definitions/types.d.ts"; 11 + export type { Track, TrackStats, TrackTags } from "@definitions/types.d.ts"; 6 12 7 - /* INPUT */ 13 + // INPUT 8 14 9 15 /** 10 16 * Consultation. ··· 32 38 ): Promise<ResolvedUri>; 33 39 }; 34 40 35 - export type InputElement = HTMLElement & InputActions; 41 + export type InputElement = 42 + & HTMLElement 43 + & WorkerProviderMethod 44 + & ProxiedActions<InputActions> 45 + & ProxyProviderMethod<InputActions>; 36 46 37 - /* TRACKS */ 47 + // MISC 48 + 49 + export type IncompleteArray<T> = ["Missing required items", T]; 50 + 51 + // TRACKS 38 52 39 53 export type ResolvedUri = undefined | { url: string; expiresAt: number }; // TODO: Streams?
+39
src/common/worker.d.ts
··· 6 6 * messages sent via `postMessage`. 7 7 */ 8 8 export type MRpcBaseMsg = { ns: string; name: string; key: number }; 9 + 10 + /** */ 11 + export type PortProvider = () => { 12 + disconnect(): void; 13 + port: MessagePort; 14 + }; 15 + 16 + /** */ 17 + export type PortProviderMethod = { port: PortProvider }; 18 + 19 + /** */ 20 + export type ProxiedActions< 21 + Actions extends Record<string, (...args: any[]) => any>, 22 + > = { 23 + [A in keyof Actions]: ProxiedAction<Actions[A]>; 24 + }; 25 + 26 + export type ProxiedAction< 27 + Action extends (...args: any[]) => any, 28 + PromisedReturn = 29 + (ReturnType<Action> extends Promise<unknown> ? ReturnType<Action> 30 + : Promise<ReturnType<Action>>), 31 + > = (...args: Parameters<Action>) => PromisedReturn; 32 + 33 + /** */ 34 + export type ProxyProvider< 35 + Actions extends Record<string, (...args: any[]) => any>, 36 + > = (workerOrPort: MessagePort | Worker) => ProxiedActions<Actions>; 37 + 38 + /** */ 39 + export type ProxyProviderMethod< 40 + Actions extends Record<string, (...args: any[]) => any>, 41 + > = { proxy: ProxyProvider<Actions> }; 42 + 43 + /** */ 44 + export type WorkerProvider = (group?: string) => Worker; 45 + 46 + /** */ 47 + export type WorkerProviderMethod = { worker: WorkerProvider };
+88 -8
src/common/worker.js
··· 1 1 import Queue from "@mary/ds-queue"; 2 - import { defineWorkerFn, useWorkerFn } from "@mys/worker-fn"; 2 + 3 + import { MRpc } from "@mys/m-rpc"; 3 4 import { getTransferables } from "@okikio/transferables"; 4 5 import { debounceMicrotask } from "@vicary/debounce-microtask"; 5 6 import { xxh32 } from "xxh32"; 7 + 6 8 import { batch } from "./signal.js"; 7 9 10 + export { getTransferables } from "@okikio/transferables"; 11 + 8 12 /** 9 13 * @import {MRpcCallOptions, WorkerGlobalScope} from "@mys/m-rpc"; 10 - * @import {Announcement} from "./worker.d.ts" 14 + * 15 + * @import {IncompleteArray} from "./types.d.ts" 16 + * @import {Announcement, ProxiedActions, ProxyProvider} from "./worker.d.ts" 11 17 */ 12 18 13 19 //////////////////////////////////////////// ··· 46 52 ); 47 53 } 48 54 55 + /** 56 + * @param {MessagePort | Worker} workerOrPort 57 + */ 58 + export function portProvider(workerOrPort) { 59 + return () => { 60 + const channel = new MessageChannel(); 61 + 62 + channel.port1.addEventListener("message", (event) => { 63 + workerOrPort.postMessage(event.data); 64 + }); 65 + 66 + /** 67 + * @param {Event} event 68 + */ 69 + const workerListener = (event) => { 70 + const msgEvent = /** @type {MessageEvent} */ (event); 71 + channel.port1.postMessage(msgEvent.data); 72 + }; 73 + 74 + workerOrPort.addEventListener("message", workerListener); 75 + 76 + channel.port1.start(); 77 + channel.port2.start(); 78 + 79 + return { 80 + disconnect: () => { 81 + workerOrPort.removeEventListener("message", workerListener); 82 + channel.port1.close(); 83 + channel.port2.close(); 84 + }, 85 + port: channel.port2, 86 + }; 87 + }; 88 + } 89 + 90 + /** 91 + * @template {Record<string, (...args: any[]) => any>} Actions 92 + * @template {keyof Actions} T 93 + * @template {T[]} U 94 + * @param {U & ([T] extends [U[number]] ? unknown : IncompleteArray<T>[])} actions 95 + */ 96 + export function proxyProvider(actions) { 97 + /** 98 + * @type {ProxyProvider<Actions>} 99 + */ 100 + return (workerOrPort) => { 101 + /** @type {Record<string | number | symbol, (...args: any[]) => any>} */ 102 + const proxy = {}; 103 + 104 + actions.forEach((action) => { 105 + proxy[action] = use(action.toString(), workerOrPort); 106 + }); 107 + 108 + return /** @type {ProxiedActions<Actions>} */ (proxy); 109 + }; 110 + } 111 + 49 112 //////////////////////////////////////////// 50 113 // RAW 51 114 //////////////////////////////////////////// ··· 99 162 fn, 100 163 context = /** @type {WorkerGlobalScope} */ (globalThis), 101 164 ) { 102 - return defineWorkerFn(name, fn, { 103 - port: /** @type {any} */ (context), 104 - }); 165 + const rpc = MRpc.ensureMRpc(context); 166 + return rpc.defineLocalFn(name, fn); 105 167 } 106 168 107 169 /** 170 + * @template {(...args: I[]) => O} Fn 171 + * @template I 172 + * @template O 108 173 * @param {string} name 109 174 * @param {MessagePort | Worker | WorkerGlobalScope} [context] Uses `globalThis` by default. 110 175 * @param {MRpcCallOptions} [options] ··· 114 179 context = /** @type {WorkerGlobalScope} */ (globalThis), 115 180 options, 116 181 ) { 117 - return useWorkerFn(name, /** @type {any} */ (context), { 118 - timeout: 60000, 119 - ...(options || {}), 182 + const rpc = MRpc.ensureMRpc(context); 183 + const _fn = rpc.useRemoteFn(name, { timeout: 60000, ...(options || {}) }); 184 + 185 + const fn = /** @type {Fn} */ (async (...args) => { 186 + try { 187 + return await _fn(...args); 188 + } catch (err) { 189 + if ( 190 + err instanceof Error && 191 + err.message === 192 + `The remote threw an error when calling the function "${name}".` 193 + ) { 194 + err.message = `The worker function "${name}" throws an error.`; 195 + } 196 + throw err; 197 + } 120 198 }); 199 + 200 + return fn; 121 201 } 122 202 123 203 ////////////////////////////////////////////
+12 -12
src/components/engine/queue/element.js
··· 2 2 3 3 import { DiffuseElement } from "@common/element.js"; 4 4 import { signal } from "@common/signal.js"; 5 - import { listen, use } from "@common/worker.js"; 5 + import { listen, proxyProvider } from "@common/worker.js"; 6 6 7 7 /** 8 - * @import {ActionsProxied, Item} from "./types.d.ts" 8 + * @import {ProxiedActions, ProxyProvider} from "@common/worker.d.ts"; 9 + * @import {Actions, Item} from "./types.d.ts" 9 10 */ 10 11 11 12 //////////////////////////////////////////// ··· 13 14 //////////////////////////////////////////// 14 15 15 16 /** 16 - * @implements {ActionsProxied} 17 + * @implements {ProxiedActions<Actions>} 17 18 */ 18 19 class QueueEngine extends DiffuseElement { 19 20 constructor() { ··· 44 45 listen("now", this.#now.set, port); 45 46 listen("past", this.#past.set, port); 46 47 47 - /** @type {ActionsProxied['add']} */ 48 - this.add = use("add", port); 48 + /** @type {ProxyProvider<Actions>} */ 49 + const proxy = proxyProvider(["add", "pool", "shift", "unshift"]); 49 50 50 - /** @type {ActionsProxied['pool']} */ 51 - this.pool = use("pool", port); 51 + // Worker proxy 52 + const w = proxy(port); 52 53 53 - /** @type {ActionsProxied['shift']} */ 54 - this.shift = use("shift", port); 55 - 56 - /** @type {ActionsProxied['unshift']} */ 57 - this.unshift = use("unshift", port); 54 + this.add = w.add; 55 + this.pool = w.pool; 56 + this.shift = w.shift; 57 + this.unshift = w.unshift; 58 58 } 59 59 60 60 // SIGNALS
+3 -10
src/components/engine/queue/types.d.ts
··· 1 - import type { Track, TrackStats, TrackTags } from "@common/types.d.ts"; 1 + import type { Track } from "@common/types.d.ts"; 2 2 import type { SignalReader } from "@common/signal.d.ts"; 3 3 4 4 export type Actions = { ··· 8 8 unshift: () => void; 9 9 }; 10 10 11 - export type ActionsProxied = { 12 - add: (args: { inFront?: boolean; items: Item[] }) => Promise<void>; 13 - pool: (tracks: Track[]) => Promise<void>; 14 - shift: () => Promise<void>; 15 - unshift: () => Promise<void>; 16 - }; 17 - 18 - export type Item<Stats = TrackStats, Tags = TrackTags> = 19 - & Track<Stats, Tags> 11 + export type Item = 12 + & Track 20 13 & { manualEntry?: boolean }; 21 14 22 15 export type State = {
+35 -10
src/components/input/opensubsonic/element.js
··· 1 1 import { DiffuseElement } from "@common/element.js"; 2 - import { use } from "@common/worker.js"; 2 + import { portProvider, proxyProvider } from "@common/worker.js"; 3 3 4 4 /** 5 5 * @import {InputActions} from "@common/types.d.ts" 6 + * @import {PortProviderMethod, ProxiedActions, ProxyProvider, ProxyProviderMethod} from "@common/worker.d.ts" 6 7 */ 7 8 8 9 //////////////////////////////////////////// ··· 10 11 //////////////////////////////////////////// 11 12 12 13 /** 13 - * @implements {InputActions} 14 + * @implements {ProxiedActions<InputActions>} 15 + * @implements {PortProviderMethod} 16 + * @implements {ProxyProviderMethod<InputActions>} 14 17 */ 15 18 class OpensubsonicInput extends DiffuseElement { 16 19 constructor() { 17 20 super(); 18 21 19 22 // Setup worker 20 - const name = `diffuse/input/opensubsonic/${this.group}`; 23 + const worker = this.worker(this.group); 24 + 25 + /** @type {ProxyProvider<InputActions>} */ 26 + this.proxy = proxyProvider([ 27 + "consult", 28 + "contextualize", 29 + "groupConsult", 30 + "list", 31 + "resolve", 32 + ]); 33 + 34 + // Worker proxy 35 + const w = this.proxy(worker); 36 + 37 + this.consult = w.consult; 38 + this.contextualize = w.contextualize; 39 + this.groupConsult = w.groupConsult; 40 + this.list = w.list; 41 + this.resolve = w.resolve; 42 + 43 + // Provide a channel to the worker 44 + this.port = portProvider(worker); 45 + } 46 + 47 + /** 48 + * @param {string} [group] 49 + */ 50 + worker(group) { 51 + const name = `diffuse/input/opensubsonic/${group || crypto.randomUUID()}`; 21 52 const url = "/components/input/opensubsonic/worker.js"; 22 - const worker = new Worker(url, { name, type: "module" }); 23 53 24 - // Worker proxy 25 - this.consult = use("consult", worker); 26 - this.contextualize = use("contextualize", worker); 27 - this.groupConsult = use("groupConsult", worker); 28 - this.list = use("list", worker); 29 - this.resolve = use("resolve", worker); 54 + return new Worker(url, { name, type: "module" }); 30 55 } 31 56 } 32 57
+2
src/components/input/opensubsonic/worker.js
··· 148 148 149 149 /** @type {Track} */ 150 150 const track = { 151 + $type: "sh.diffuse.output.tracks", 151 152 id: crypto.randomUUID(), 152 153 kind: autoTypeToTrackKind(song.type), 153 154 uri: buildURI(server, { songId: song.id, path }), ··· 234 235 // picked up whenever it is re-contextualized. 235 236 return Object.values(servers).map((server) => { 236 237 return { 238 + $type: "sh.diffuse.output.tracks", 237 239 id: crypto.randomUUID(), 238 240 kind: "placeholder", 239 241 uri: buildURI(server),
+2
src/components/input/s3/worker.js
··· 110 110 111 111 /** @type {Track} */ 112 112 const track = { 113 + $type: "sh.diffuse.output.tracks", 113 114 id, 114 115 stats, 115 116 tags, ··· 164 165 165 166 /** @type {Track} */ 166 167 const track = { 168 + $type: "sh.diffuse.output.tracks", 167 169 id: crypto.randomUUID(), 168 170 kind: "placeholder", 169 171 uri,
+40 -44
src/components/orchestrator/process-tracks/element.js
··· 1 - import deepDiff from "@fry69/deep-diff"; 2 - 3 1 import { DiffuseElement, query } from "@common/element.js"; 4 - import { signal } from "@common/signal.js"; 2 + import { signal, untracked } from "@common/signal.js"; 3 + import { getTransferables, portProvider, use } from "@common/worker.js"; 5 4 6 5 /** 7 6 * @import {InputElement, Track} from "@common/types.d.ts" ··· 18 17 * from the assigned output element. 19 18 */ 20 19 class ProcessTracksOrchestrator extends DiffuseElement { 20 + #external; 21 + #process; 22 + 21 23 constructor() { 22 24 super(); 23 25 26 + // Setup worker 27 + const name = `diffuse/orchestrator/process-tracks/${this.group}`; 28 + const url = "/components/orchestrator/process-tracks/worker.js"; 29 + const worker = new Worker(url, { name, type: "module" }); 30 + 24 31 /** @type {InputElement} */ 25 32 this.input = query(this, "input-selector"); 26 33 ··· 29 36 30 37 /** @type {import("@components/processor/metadata/element.js").CLASS} */ 31 38 this.metadataProcessor = query(this, "metadata-processor-selector"); 39 + 40 + // Create new workers specially for track processing 41 + this.#external = { 42 + input: portProvider(this.input.worker()), 43 + metadataProcessor: portProvider(this.metadataProcessor.worker()), 44 + }; 45 + 46 + // Worker proxy 47 + this.#process = use("process", worker, { 48 + timeout: 60000 * 60 * 2, // 2 hours 49 + transfer: getTransferables, 50 + }); 32 51 } 33 52 34 53 // SIGNALS ··· 55 74 const state = this.output.tracks.state(); 56 75 if (state !== "loaded") return; 57 76 58 - this.process(); 77 + untracked(() => this.process()); 59 78 }); 60 79 } 61 80 ··· 71 90 72 91 const cachedTracks = this.output.tracks.collection(); 73 92 74 - // Contextualize 75 - await this.input.contextualize(cachedTracks); 76 - 77 - // List 78 - const tracks = await this.input.list(cachedTracks); 79 - 80 - // Fetch metadata if needed 81 - const tracksWithMetadata = await tracks.reduce( 82 - /** 83 - * @param {Promise<Track[]>} promise 84 - * @param {Track} track 85 - */ 86 - async (promise, track) => { 87 - const acc = await promise; 88 - 89 - if (track.tags && track.stats) return [...acc, track]; 90 - 91 - const resGet = await this.input.resolve({ 92 - method: "GET", 93 - uri: track.uri, 94 - }); 95 - 96 - const resHead = await this.input.resolve({ 97 - method: "HEAD", 98 - uri: track.uri, 99 - }); 100 - 101 - if (!resGet) return [...acc, track]; 102 - 103 - const { stats, tags } = await this.metadataProcessor.supply({ 104 - urls: { get: resGet.url, head: resHead?.url || resGet.url }, 105 - }); 93 + // Establish channel between external workers and our processing worker 94 + const ports = { 95 + input: this.#external.input(), 96 + metadataProcessor: this.#external.metadataProcessor(), 97 + }; 106 98 107 - return [...acc, { ...track, stats, tags }]; 99 + // Send everything to worker 100 + const result = await this.#process({ 101 + ports: { 102 + input: ports.input.port, 103 + metadataProcessor: ports.metadataProcessor.port, 108 104 }, 109 - Promise.resolve([]), 110 - ); 105 + tracks: cachedTracks, 106 + }); 111 107 112 - // Changed? 113 - const diff = deepDiff.diff(tracksWithMetadata, cachedTracks); 114 - const changed = !!diff; 108 + // Save if collection changed 109 + if (result) await this.output.tracks.save(result); 115 110 116 - // Save if changed 117 - if (changed) await this.output.tracks.save(tracksWithMetadata); 111 + // Close external channels 112 + ports.input.disconnect(); 113 + ports.metadataProcessor.disconnect(); 118 114 119 115 // Fin 120 116 console.log("🪵 Processing completed");
+12
src/components/orchestrator/process-tracks/types.d.ts
··· 1 + import type { Track } from "@common/types.d.ts"; 2 + 3 + export type Actions = { 4 + process: ( 5 + args: { 6 + ports: { input: MessagePort; metadataProcessor: MessagePort }; 7 + tracks: Track[]; 8 + }, 9 + ) => Promise<Track[] | null>; 10 + }; 11 + 12 + export type ActionsProxied = Actions;
+87
src/components/orchestrator/process-tracks/worker.js
··· 1 + import deepDiff from "@fry69/deep-diff"; 2 + import { define, ostiary, proxyProvider, use } from "@common/worker.js"; 3 + import { INPUT_ACTIONS } from "@common/constants.js"; 4 + 5 + /** 6 + * @import {InputActions, Track} from "@common/types.d.ts" 7 + * @import {ProxyProvider} from "@common/worker.d.ts" 8 + * @import {Actions as MetadataProcessorActions} from "@components/processor/metadata/types.d.ts" 9 + * @import {Actions} from "./types.d.ts" 10 + */ 11 + 12 + //////////////////////////////////////////// 13 + // ACTIONS 14 + //////////////////////////////////////////// 15 + 16 + /** 17 + * @type {Actions["process"]} 18 + */ 19 + export async function process(args) { 20 + const { ports } = args; 21 + const cachedTracks = args.tracks; 22 + 23 + /** @type {ProxyProvider<InputActions>} */ 24 + const inputProvider = proxyProvider(INPUT_ACTIONS); 25 + const input = inputProvider(ports.input); 26 + 27 + /** @type {ProxyProvider<MetadataProcessorActions>} */ 28 + const metadataProcessorProvider = proxyProvider(["supply"]); 29 + const metadataProcessor = metadataProcessorProvider(ports.metadataProcessor); 30 + 31 + ports.input.start(); 32 + ports.metadataProcessor.start(); 33 + 34 + // Contextualize 35 + await input.contextualize(cachedTracks); 36 + 37 + // List 38 + const tracks = await input.list(cachedTracks); 39 + 40 + // Fetch metadata if needed 41 + const tracksWithMetadata = await tracks.reduce( 42 + /** 43 + * @param {Promise<Track[]>} promise 44 + * @param {Track} track 45 + */ 46 + async (promise, track) => { 47 + const acc = await promise; 48 + 49 + if (track.tags && track.stats) return [...acc, track]; 50 + 51 + const resGet = await input.resolve({ 52 + method: "GET", 53 + uri: track.uri, 54 + }); 55 + 56 + const resHead = await input.resolve({ 57 + method: "HEAD", 58 + uri: track.uri, 59 + }); 60 + 61 + if (!resGet) return [...acc, track]; 62 + 63 + const { stats, tags } = await metadataProcessor.supply({ 64 + urls: { get: resGet.url, head: resHead?.url || resGet.url }, 65 + }); 66 + 67 + return [...acc, { ...track, stats, tags }]; 68 + }, 69 + Promise.resolve([]), 70 + ); 71 + 72 + // Changed? 73 + const diff = deepDiff.diff(tracksWithMetadata, cachedTracks); 74 + const changed = !!diff; 75 + 76 + // Save if changed 77 + if (changed) return tracksWithMetadata; 78 + return null; 79 + } 80 + 81 + //////////////////////////////////////////// 82 + // ⚡️ 83 + //////////////////////////////////////////// 84 + 85 + ostiary((port) => { 86 + define("process", process, port); 87 + });
+19 -6
src/components/processor/metadata/element.js
··· 1 1 import { DiffuseElement } from "@common/element.js"; 2 - import { use } from "@common/worker.js"; 2 + import { portProvider, proxyProvider } from "@common/worker.js"; 3 3 4 4 /** 5 + * @import {PortProviderMethod, ProxiedActions, ProxyProvider, ProxyProviderMethod, WorkerProviderMethod} from "@common/worker.d.ts" 5 6 * @import {Actions} from "./types.d.ts" 6 7 */ 7 8 ··· 10 11 //////////////////////////////////////////// 11 12 12 13 /** 13 - * @implements {Actions} 14 + * @implements {ProxiedActions<Actions>} 15 + * @implements {WorkerProviderMethod} 16 + * @implements {ProxyProviderMethod<Actions>} 14 17 */ 15 18 class MetadataProcessor extends DiffuseElement { 16 19 constructor() { 17 20 super(); 18 21 19 22 // Setup worker 20 - const name = `diffuse/processor/metadata/${this.group}`; 21 - const url = "/components/processor/metadata/worker.js"; 22 - const worker = new Worker(url, { name, type: "module" }); 23 + const worker = this.worker(this.group); 24 + 25 + /** @type {ProxyProvider<Actions>} */ 26 + this.proxy = proxyProvider(["supply"]); 23 27 24 28 // Worker proxy 25 - this.supply = use("supply", worker); 29 + this.supply = this.proxy(worker).supply; 30 + } 31 + 32 + /** 33 + * @param {string} [group] 34 + */ 35 + worker(group) { 36 + const name = `diffuse/processor/metadata/${group || crypto.randomUUID()}`; 37 + const url = "/components/processor/metadata/worker.js"; 38 + return new Worker(url, { name, type: "module" }); 26 39 } 27 40 } 28 41