A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: worker connections with dependencies

+311 -358
+5 -2
src/common/constituents/default.js
··· 55 55 const trigger = queue.now(); 56 56 const _other_trigger = queue.poolHash(); 57 57 58 - queue.fill({ amount: 10, shuffled: true }); 59 - if (!trigger) queue.shift(); 58 + oqt.isLeader().then((isLeader) => { 59 + if (!isLeader) return; 60 + queue.fill({ amount: 10, shuffled: true }); 61 + if (!trigger) queue.shift(); 62 + }); 60 63 }); 61 64 }
-6
src/common/element.d.ts
··· 9 9 ...values: unknown[] 10 10 ) => string; 11 11 12 - export type ProvisionedWorkers<T extends string> = { 13 - [K in T]: ProvisionedWorker; 14 - }; 15 - 16 - export type ProvisionedWorker = Worker | SharedWorker; 17 - 18 12 export type RenderArg<State = undefined> = { 19 13 html: HtmlTagFunction; 20 14 state: State;
+129 -101
src/common/element.js
··· 1 1 import QS from "query-string"; 2 - import { RPCChannel } from "@kunkun/kkrpc"; 2 + import { decodeMessage, encodeMessage, RPCChannel } from "@kunkun/kkrpc"; 3 3 import { html, render } from "lit-html"; 4 4 5 5 import { effect, signal } from "@common/signal.js"; 6 - import { rpc, transfer, workerLink, workerTunnel } from "./worker.js"; 6 + import { 7 + rpc, 8 + transfer, 9 + workerLink, 10 + workerProxy, 11 + workerTunnel, 12 + } from "./worker.js"; 7 13 import { BrowserPostMessageIo } from "./worker/rpc.js"; 8 14 9 15 // RE-EXPORT ··· 21 27 * around rendering and managing signals. 22 28 */ 23 29 export class DiffuseElement extends HTMLElement { 30 + #connected = Promise.withResolvers(); 24 31 #disposables = /** @type {Array<() => void>} */ ([]); 25 - 26 - #teardown() { 27 - this.#disposables.forEach((fn) => fn()); 28 - } 29 32 30 33 constructor() { 31 34 super(); ··· 55 58 this.#disposables.push(effect(fn)); 56 59 } 57 60 61 + /** */ 62 + forceRender() { 63 + return this.#render(); 64 + } 65 + 66 + /** */ 67 + nameWithGroup() { 68 + return `${this.constructor.prototype.constructor.NAME}/${this.group}`; 69 + } 70 + 71 + /** */ 72 + root() { 73 + return (this.shadowRoot ?? this); 74 + } 75 + 76 + /** */ 77 + whenConnected() { 78 + return this.#connected.promise; 79 + } 80 + 58 81 /** 59 82 * Avoid replacing the whole subtree, 60 83 * morph the existing DOM into the new given tree. ··· 70 93 render(tmp, this.root()); 71 94 } 72 95 73 - /** */ 74 - forceRender() { 75 - return this.#render(); 76 - } 77 - 78 - /** */ 79 - root() { 80 - return (this.shadowRoot ?? this); 81 - } 82 - 83 96 // LIFECYCLE 84 97 85 98 connectedCallback() { 99 + this.#connected.resolve(null); 100 + 86 101 if (!("render" in this && typeof this.render === "function")) return; 87 102 88 103 this.effect(() => { ··· 95 110 this.#teardown(); 96 111 } 97 112 98 - // WORKER 113 + #teardown() { 114 + this.#disposables.forEach((fn) => fn()); 115 + } 116 + 117 + // WORKERS 99 118 100 119 /** @type {undefined | Worker | SharedWorker} */ 101 120 #worker; ··· 115 134 ); 116 135 117 136 // Setup worker 118 - const name = `${NAME}/${this.group}`; 137 + const name = this.nameWithGroup(); 119 138 const url = import.meta.resolve("./" + WORKER_URL) + `?${query}`; 120 139 121 140 let worker; ··· 129 148 return worker; 130 149 } 131 150 151 + /** */ 152 + dependencies() { 153 + return Object.fromEntries( 154 + Array.from(this.children).flatMap((element) => { 155 + if ("nameWithGroup" in element === false) { 156 + return []; 157 + } 158 + 159 + const d = /** @type {DiffuseElement} */ (element); 160 + return [[d.localName, d]]; 161 + }), 162 + ); 163 + } 164 + 132 165 worker() { 133 166 this.#worker ??= this.createWorker(); 134 167 return this.#worker; ··· 138 171 const worker = this.worker(); 139 172 return workerLink(worker); 140 173 } 174 + 175 + /** 176 + * @template {Record<string, (...args: any[]) => any>} Actions 177 + * @returns {ProxiedActions<Actions>} 178 + */ 179 + workerProxy() { 180 + return workerProxy( 181 + () => this.workerTunnel().port, 182 + ); 183 + } 184 + 185 + /** 186 + * @param {{ newWorker?: boolean }} [opts] 187 + */ 188 + workerTunnel({ newWorker } = {}) { 189 + // Creates a MessagePort that is connected to the worker. 190 + // All the dependencies are added automatically. 191 + const worker = newWorker ? this.createWorker() : this.worker(); 192 + const deps = this.dependencies(); 193 + 194 + let toWorker; 195 + 196 + if (Object.keys(deps).length) { 197 + toWorker = 198 + /** 199 + * @param {any} msg 200 + */ 201 + async (msg) => { 202 + /** @type {Array<[string, Tunnel]>} */ 203 + const ports = Object.entries(deps).map( 204 + /** @param {[string, DiffuseElement]} _ */ 205 + ([k, v]) => [k, v.workerTunnel()], 206 + ); 207 + 208 + const decoded = await decodeMessage(msg); 209 + const data = { 210 + data: Array.isArray(decoded.args) ? decoded.args[0] : decoded.args, 211 + ports: Object.fromEntries(ports.map(([k, v]) => { 212 + return [k, v.port]; 213 + })), 214 + }; 215 + 216 + const encoded = encodeMessage( 217 + { 218 + ...decoded, 219 + args: Array.isArray(decoded.args) 220 + ? [data, ...decoded.args.slice(1)] 221 + : decoded.args, 222 + }, 223 + {}, 224 + true, 225 + ports.map(([_k, v]) => v.port), 226 + ); 227 + 228 + this.#disposables.push(() => { 229 + ports.forEach(([_k, v]) => v.disconnect()); 230 + }); 231 + 232 + return { 233 + data: encoded, 234 + transfer: ports.map(([_k, v]) => v.port), 235 + }; 236 + }; 237 + } 238 + 239 + const tunnel = workerTunnel(worker, { toWorker }); 240 + return tunnel; 241 + } 141 242 } 142 243 143 244 /** ··· 171 272 /** 172 273 * @template {Record<string, { strategy: "leaderOnly" | "replicate", fn: (...args: any[]) => any }>} ActionsWithStrategy 173 274 * @template {{ [K in keyof ActionsWithStrategy]: ActionsWithStrategy[K]["fn"] }} Actions 174 - * @param {string} name 275 + * @param {string} channelName 175 276 * @param {ActionsWithStrategy} actionsWithStrategy 176 277 */ 177 - broadcast(name, actionsWithStrategy) { 278 + broadcast(channelName, actionsWithStrategy) { 178 279 if (this.broadcasted) return; 179 280 180 - const channel = new BroadcastChannel(name); 281 + const channel = new BroadcastChannel(channelName); 181 282 const msg = new MessageChannel(); 182 283 183 284 /** ··· 185 286 */ 186 287 187 288 this.broadcasted = true; 188 - this.name = name; 289 + this.channelName = channelName; 189 290 190 291 const _rpc = rpc( 191 292 msg.port2, ··· 291 392 // Grab a lock if it isn't acquired yet, 292 393 // and hold it until `this.lock.promise` resolves. 293 394 navigator.locks.request( 294 - `${this.name}/lock`, 395 + `${this.channelName}/lock`, 295 396 { ifAvailable: true }, 296 397 (lock) => { 297 398 this.#status.resolve( ··· 305 406 // Additionally, wait for lock if needed. 306 407 this.#status.promise.then((status) => { 307 408 if (status.leader) { 308 - console.log(`🧙 Elected leader for: ${this.name}`); 409 + console.log(`🧙 Elected leader for: ${this.channelName}`); 309 410 } else { 310 - console.log(`🔮 Watching leader: ${this.name}`); 411 + console.log(`🔮 Watching leader: ${this.channelName}`); 311 412 } 312 413 313 414 // Wait for leadership 314 415 if (status.leader === false) { 315 416 navigator.locks.request( 316 - `${this.name}/lock`, 417 + `${this.channelName}/lock`, 317 418 () => { 318 419 this.#status = Promise.withResolvers(); 319 420 this.#status.resolve({ leader: true, initialLeader: false }); ··· 337 438 } 338 439 339 440 /** 340 - * @template {string} A 341 - * @template {ProvisionedWorkers<A>} B 342 - * @template {Record<string, any>} C 343 - * @template R 344 - * @param {Promise<B> | undefined} provisions 345 - * @param {(args: C & { ports: { [K in keyof B]: MessagePort } }) => R} fn 346 - * @param {C} fnArgs 347 - * @returns {Promise<R>} 348 - */ 349 - export async function callWorkerWithProvisions(provisions, fn, fnArgs) { 350 - const workers = await provisions; 351 - if (!workers) throw new Error("Workers not defined"); 352 - 353 - /** @type {Array<[keyof B, Tunnel]>} */ 354 - const tunnels = Object.keys(workers).map( 355 - (value) => { 356 - const key = /** @type {keyof B} */ (value); 357 - const worker = workers[key]; 358 - return [key, workerTunnel(worker)]; 359 - }, 360 - ); 361 - 362 - const ports = /** @type {{ [K in keyof B]: MessagePort }} */ ( 363 - Object.fromEntries( 364 - tunnels.map(([key, tunnel]) => { 365 - return [key, tunnel.port]; 366 - }), 367 - ) 368 - ); 369 - 370 - const args = { 371 - ...fnArgs, 372 - ports, 373 - }; 374 - 375 - const result = await fn(transfer( 376 - args, 377 - tunnels.map(([_key, tunnel]) => { 378 - return tunnel.port; 379 - }), 380 - )); 381 - 382 - tunnels.forEach(([_key, tunnel]) => { 383 - tunnel.disconnect(); 384 - }); 385 - 386 - return result; 387 - } 388 - 389 - /** 390 441 * Component DOM selector. 391 442 * 392 443 * Basically `document.querySelector` but returns the element ··· 433 484 } 434 485 435 486 /** 436 - * @template {Record<string, DiffuseElement>} T 437 - * @param {T} elements 487 + * @param {Record<string, Worker | SharedWorker>} workers 438 488 */ 439 - export async function provisionWorkers(elements) { 440 - await whenElementsDefined(elements); 441 - 442 - /** @type {Record<string, ProvisionedWorker>} */ 443 - const provisions = {}; 444 - 445 - Object.entries(elements).forEach(([key, element]) => { 446 - const worker = element.createWorker(); 447 - provisions[key] = worker; 448 - }); 449 - 450 - const casted = 451 - /** @type {{ [K in keyof T]: ProvisionedWorker}} */ (provisions); 452 - 453 - return casted; 454 - } 455 - 456 - /** 457 - * @param {ProvisionedWorkers<any> | undefined} workers 458 - */ 459 - export function terminateProvisions(workers) { 460 - if (!workers) return; 461 - 489 + export function terminateWorkers(workers) { 462 490 Object.values(workers).forEach((worker) => { 463 491 if (worker instanceof Worker) worker.terminate(); 464 492 });
+24
src/common/worker.d.ts
··· 1 1 export type Announcement<T> = MRpcBaseMsg & { type: "announcement"; args: T }; 2 2 export type IncompleteArray<T> = ["Missing required items", T]; 3 3 4 + export type ActionsWithTunnel< 5 + Actions extends Record<string, (...args: any[]) => any>, 6 + > = { 7 + [A in keyof Actions]: WithTunnel<Actions[A]>; 8 + }; 9 + 10 + export type Dependencies<T extends string> = { 11 + [K in T]: Worker | SharedWorker; 12 + }; 13 + 4 14 /** 5 15 * Comes from the `@mys/m-rpc` library, 6 16 * but it is not exported. Used to identify ··· 34 44 disconnect: () => void; 35 45 port: MessagePort; 36 46 }; 47 + 48 + /** */ 49 + export type WithTunnel< 50 + Fn extends (...args: any[]) => any, 51 + PromisedReturn = (ReturnType<Fn> extends Promise<unknown> ? ReturnType<Fn> 52 + : Promise<ReturnType<Fn>>), 53 + > = ( 54 + _: { data: Parameters<Fn>[0]; ports: Record<string, MessagePort> }, 55 + ...args: Rest<Parameters<Fn>> 56 + ) => PromisedReturn; 57 + 58 + // 🛑 59 + 60 + type Rest<T> = T extends [any, ...(infer R)[]] ? R : never;
+19 -9
src/common/worker.js
··· 1 - import { RPCChannel } from "@kunkun/kkrpc"; 1 + import { RPCChannel, transfer } from "@kunkun/kkrpc"; 2 2 import { getTransferables } from "@okikio/transferables"; 3 3 import { debounceMicrotask } from "@vicary/debounce-microtask"; 4 4 import { xxh32 } from "xxh32"; ··· 9 9 export { transfer } from "@kunkun/kkrpc"; 10 10 11 11 /** 12 - * @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts" 12 + * @import {Announcement, Dependencies, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts" 13 13 */ 14 14 15 15 //////////////////////////////////////////// ··· 90 90 // Create proxy that creates RPC API when needed 91 91 const proxy = new Proxy(() => {}, { 92 92 get: (_target, prop) => { 93 - const api = ensureAPI(); 94 - return api[prop.toString()]; 93 + /** @param {Parameters<Actions[any]>} args */ 94 + return (...args) => { 95 + const api = ensureAPI(); 96 + return api[prop.toString()](...args); 97 + }; 95 98 }, 96 99 }); 97 100 ··· 100 103 101 104 /** 102 105 * @param {MessagePort | Worker | SharedWorker} workerOrLink 106 + * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks] 103 107 * @returns {Tunnel} 104 108 */ 105 - export function workerTunnel(workerOrLink) { 109 + export function workerTunnel(workerOrLink, hooks = {}) { 106 110 const link = workerOrLink instanceof SharedWorker 107 111 ? workerLink(workerOrLink) 108 112 : workerOrLink; 109 113 const channel = new MessageChannel(); 110 114 111 - channel.port1.addEventListener("message", (event) => { 112 - link.postMessage(event.data); 115 + channel.port1.addEventListener("message", async (event) => { 116 + // Send to worker 117 + const { data, transfer } = await hooks?.toWorker?.(event.data) ?? 118 + { data: event.data }; 119 + link.postMessage(data, { transfer }); 113 120 }); 114 121 115 122 /** 116 123 * @param {Event} event 117 124 */ 118 - const workerListener = (event) => { 125 + const workerListener = async (event) => { 126 + // Receive from worker 119 127 const msgEvent = /** @type {MessageEvent} */ (event); 120 - channel.port1.postMessage(msgEvent.data); 128 + const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ?? 129 + { data: msgEvent.data }; 130 + channel.port1.postMessage(data, { transfer }); 121 131 }; 122 132 123 133 link.addEventListener("message", workerListener);
+16 -56
src/components/configurator/input/element.js
··· 1 - import { DiffuseElement, workerProxy } from "@common/element.js"; 2 - import { transfer, workerLink, workerTunnel } from "@common/worker.js"; 1 + import { DiffuseElement, whenElementsDefined } from "@common/element.js"; 3 2 4 3 /** 5 4 * @import {ProxiedActions, Tunnel} from "@common/worker.d.ts" 6 5 * @import {InputActions, InputElement} from "@components/input/types.d.ts" 7 - * @import {AdditionalActions} from "./types.d.ts" 8 6 */ 9 7 10 8 /** ··· 26 24 super(); 27 25 28 26 /** @type {ProxiedActions<InputActions>} */ 29 - const proxy = workerProxy(this.workerLink); 27 + const proxy = this.workerProxy(); 30 28 31 29 this.consult = proxy.consult; 32 30 this.contextualize = proxy.contextualize; ··· 35 33 this.resolve = proxy.resolve; 36 34 } 37 35 38 - // WORKER 36 + // LIFECYCLE 39 37 40 38 /** 41 39 * @override 42 40 */ 43 - createWorker() { 44 - const worker = super.createWorker(); 45 - 46 - // Wait for child elements to be rendered 47 - setTimeout(() => this.configureWorker(worker), 0); 48 - 49 - return worker; 41 + async connectedCallback() { 42 + super.connectedCallback(); 43 + await whenElementsDefined(this.inputs()); 50 44 } 51 45 52 - // 🛠️ 46 + // WORKERS 53 47 54 48 /** 55 - * @param {Worker | SharedWorker} worker 49 + * @override 56 50 */ 57 - async configureWorker(worker) { 58 - const inputs = await this.inputTunnels(); 59 - 60 - // Check if any inputs are present 61 - if (inputs.length === 0) return; 62 - 63 - // Configure worker with input ports 64 - const args = transfer({ 65 - ports: Object.fromEntries(inputs.map((input) => { 66 - return [input.element.SCHEME, input.tunnel.port]; 67 - })), 68 - }, inputs.map((i) => i.tunnel.port)); 69 - 70 - /** @type {ProxiedActions<AdditionalActions>} */ 71 - const proxy = workerProxy(() => workerLink(worker)); 72 - proxy.configure(args); 51 + dependencies() { 52 + return this.inputs(); 73 53 } 74 54 75 - async inputTunnels() { 76 - const inputElements = this.children; 77 - const inputs = await Array.from(inputElements).reduce( 78 - /** 79 - * @param {Promise<Array<Input>>} acc 80 - * @param {Element} el 81 - */ 82 - async (acc, el) => { 83 - const rec = await acc; 84 - await customElements.whenDefined(el.localName); 85 - 86 - const element = /** @type {InputElement} */ (el); 87 - const worker = element.worker(); 88 - const tunnel = workerTunnel(worker); 89 - 90 - const item = { 91 - element, 92 - tunnel, 93 - worker, 94 - }; 95 - 96 - return [...rec, item]; 97 - }, 98 - Promise.resolve([]), 55 + inputs() { 56 + return Object.fromEntries( 57 + Array.from(this.children).map((element) => { 58 + const input = /** @type {InputElement} */ (element); 59 + return [input.SCHEME, input]; 60 + }), 99 61 ); 100 - 101 - return inputs; 102 62 } 103 63 } 104 64
-3
src/components/configurator/input/types.d.ts
··· 1 - export type AdditionalActions = { 2 - configure: (args: { ports: { [S in string]: MessagePort } }) => void; 3 - };
+39 -56
src/components/configurator/input/worker.js
··· 6 6 /** 7 7 * @import {Track} from "@definitions/types.d.ts"; 8 8 * @import {GroupConsult, InputActions} from "@components/input/types.d.ts" 9 - * @import {ProxiedActions} from "@common/worker.d.ts" 10 - * @import {AdditionalActions} from "./types.d.ts" 11 - */ 12 - 13 - /** @type {Record<string, ProxiedActions<InputActions>>} */ 14 - const inputs = {}; 15 - 16 - //////////////////////////////////////////// 17 - // ACTIONS 18 - //////////////////////////////////////////// 19 - 20 - /** 21 - * @type {AdditionalActions["configure"]} 9 + * @import {ActionsWithTunnel, ProxiedActions} from "@common/worker.d.ts" 22 10 */ 23 - export function configure({ ports }) { 24 - Object.keys(ports).forEach((key) => { 25 - inputs[key.toLowerCase()] = workerProxy(() => { 26 - const port = ports[key]; 27 - port.start(); 28 - return port; 29 - }); 30 - }); 31 - } 32 11 33 12 //////////////////////////////////////////// 34 13 // INPUT ACTIONS 35 14 //////////////////////////////////////////// 36 15 37 16 /** 38 - * @type {InputActions['consult']} 17 + * @type {ActionsWithTunnel<InputActions>['consult']} 39 18 */ 40 - export async function consult(fileUriOrScheme) { 19 + export async function consult({ data, ports }) { 20 + const fileUriOrScheme = data; 41 21 const scheme = fileUriOrScheme.includes(":") 42 22 ? URI.parse(fileUriOrScheme).scheme || fileUriOrScheme 43 23 : fileUriOrScheme; 44 24 45 - const input = grabInput(scheme); 25 + const input = grabInput(scheme, ports); 46 26 47 27 if (!input) { 48 28 return { supported: false, reason: "Unsupported scheme" }; ··· 52 32 } 53 33 54 34 /** 55 - * @type {InputActions['contextualize']} 35 + * @type {ActionsWithTunnel<InputActions>['contextualize']} 56 36 */ 57 - export async function contextualize(tracks) { 58 - const groups = groupTracks(tracks); 37 + export async function contextualize({ data, ports }) { 38 + const tracks = data; 39 + const groups = groupTracks(tracks, ports); 59 40 const promises = Object.entries(groups).map( 60 41 async ([scheme, tracksGroup]) => { 61 - const input = grabInput(scheme); 42 + const input = grabInput(scheme, ports); 62 43 if (!input || tracksGroup.length === 0) return; 63 44 return await input.contextualize(tracksGroup); 64 45 }, ··· 68 49 } 69 50 70 51 /** 71 - * @type {InputActions['groupConsult']} 52 + * @type {ActionsWithTunnel<InputActions>['groupConsult']} 72 53 */ 73 - export async function groupConsult(tracks) { 54 + export async function groupConsult({ data, ports }) { 55 + const tracks = data; 74 56 const groups = groupTracksPerScheme(tracks); 75 57 76 58 /** @type {GroupConsult[]} */ 77 59 const consultations = await Promise.all( 78 60 Object.keys(groups).map(async (scheme) => { 79 - const input = grabInput(scheme); 61 + const input = grabInput(scheme, ports); 80 62 81 63 if (!input) { 82 64 return { ··· 98 80 } 99 81 100 82 /** 101 - * @type {InputActions['list']} 83 + * @type {ActionsWithTunnel<InputActions>['list']} 102 84 */ 103 - export async function list(cachedTracks = []) { 104 - const groups = await groupConsult(cachedTracks); 85 + export async function list({ data, ports }) { 86 + const groups = await groupConsult({ data, ports }); 105 87 106 - Object.keys(inputs).forEach((scheme) => { 88 + Object.keys(ports).forEach((scheme) => { 107 89 if (!groups[scheme]) groups[scheme] = { available: true, tracks: [] }; 108 90 }); 109 91 ··· 111 93 async ([scheme, { available, tracks }]) => { 112 94 if (!available) return tracks; 113 95 114 - const input = grabInput(scheme); 96 + const input = grabInput(scheme, ports); 115 97 if (!input) return tracks; 116 98 return await input.list(tracks); 117 99 }, ··· 124 106 } 125 107 126 108 /** 127 - * @type {InputActions['resolve']} 109 + * @type {ActionsWithTunnel<InputActions>['resolve']} 128 110 */ 129 - export async function resolve(args) { 130 - const scheme = args.uri.split(":", 1)[0]; 131 - const input = grabInput(scheme); 111 + export async function resolve({ data, ports }) { 112 + const uri = data.uri; 113 + const scheme = uri.split(":", 1)[0]; 114 + const input = grabInput(scheme, ports); 132 115 if (!input) return undefined; 133 116 134 - try { 135 - return await input.resolve(args); 136 - } catch (err) { 137 - console.error( 138 - `[configurator/input] Resolve error for scheme '${scheme}'.`, 139 - err, 140 - ); 141 - } 117 + const result = await input.resolve(data); 118 + return result; 142 119 } 143 120 144 121 //////////////////////////////////////////// ··· 152 129 groupConsult, 153 130 list, 154 131 resolve, 155 - 156 - // Additional 157 - configure, 158 132 }); 159 133 }); 160 134 ··· 164 138 165 139 /** 166 140 * @param {string} scheme 141 + * @param {Record<string, MessagePort>} ports 142 + * @returns {ProxiedActions<InputActions> | null} 167 143 */ 168 - function grabInput(scheme) { 169 - return inputs[scheme.toLowerCase()]; 144 + function grabInput(scheme, ports) { 145 + const port = ports[scheme]; 146 + if (!port) return null; 147 + 148 + return workerProxy(() => { 149 + port.start(); 150 + return port; 151 + }); 170 152 } 171 153 172 154 /** 173 155 * @param {Track[]} tracks 156 + * @param {Record<string, MessagePort>} ports 174 157 */ 175 - function groupTracks(tracks) { 158 + function groupTracks(tracks, ports) { 176 159 const grouped = groupTracksPerScheme( 177 160 tracks, 178 161 Object.fromEntries( 179 - Object.keys(inputs).map((k) => { 162 + Object.keys(ports).map((k) => { 180 163 return [k, []]; 181 164 }), 182 165 ),
+1 -1
src/components/engine/audio/element.js
··· 46 46 // Setup leader election if shared 47 47 if (this.hasAttribute("group")) { 48 48 const actions = this.broadcast( 49 - `${this.constructor.prototype.constructor.NAME}/${this.group}`, 49 + this.nameWithGroup(), 50 50 { 51 51 adjustVolume: { strategy: "leaderOnly", fn: this.adjustVolume }, 52 52 pause: { strategy: "leaderOnly", fn: this.pause },
+2 -2
src/components/engine/queue/element.js
··· 1 - import { DiffuseElement, workerProxy } from "@common/element.js"; 1 + import { DiffuseElement } from "@common/element.js"; 2 2 import { signal } from "@common/signal.js"; 3 3 import { listen } from "@common/worker.js"; 4 4 import { hash } from "@common/index.js"; ··· 23 23 super(); 24 24 25 25 /** @type {ProxiedActions<Actions & State>} */ 26 - this.proxy = workerProxy(this.workerLink); 26 + this.proxy = this.workerProxy(); 27 27 28 28 this.add = this.proxy.add; 29 29 this.fill = this.proxy.fill;
+15 -24
src/components/orchestrator/process-tracks/element.js
··· 1 - import { 2 - callWorkerWithProvisions, 3 - DiffuseElement, 4 - provisionWorkers, 5 - query, 6 - terminateProvisions, 7 - workerProxy, 8 - } from "@common/element.js"; 1 + import { DiffuseElement, query } from "@common/element.js"; 9 2 import { signal, untracked } from "@common/signal.js"; 10 3 11 4 /** 12 5 * @import {Track} from "@definitions/types.d.ts" 13 - * @import {ProvisionedWorkers} from "@common/element.d.ts" 14 6 * @import {ProxiedActions} from "@common/worker.d.ts" 15 7 * @import {InputElement} from "@components/input/types.d.ts" 16 8 * @import {OutputElement} from "@components/output/types.d.ts" ··· 34 26 /** @type {ProxiedActions<Actions>} */ 35 27 #proxy; 36 28 37 - /** @type {Promise<ProvisionedWorkers<"input" | "metadataProcessor">> | undefined} */ 38 - #workers = undefined; 39 - 40 29 constructor() { 41 30 super(); 42 - this.#proxy = workerProxy(this.workerLink); 31 + this.#proxy = this.workerProxy(); 43 32 } 44 33 45 34 // SIGNALS ··· 72 61 this.output = output; 73 62 this.metadataProcessor = metadataProcessor; 74 63 75 - // Create new workers 76 - this.#workers = provisionWorkers({ input, metadataProcessor }); 77 - 78 64 // Wait until defined 79 65 await customElements.whenDefined(output.localName); 80 66 ··· 86 72 untracked(() => this.process()); 87 73 }); 88 74 } 75 + 76 + // WORKERS 89 77 90 78 /** 91 79 * @override 92 80 */ 93 - async disconnectedCallback() { 94 - super.disconnectedCallback(); 95 - terminateProvisions(await this.#workers); 81 + dependencies() { 82 + if (!this.input) throw new Error("Input element not defined yet"); 83 + if (!this.metadataProcessor) { 84 + throw new Error("Metadata processor element not defined yet"); 85 + } 86 + 87 + return { 88 + input: this.input, 89 + metadataProcessor: this.metadataProcessor, 90 + }; 96 91 } 97 92 98 93 // ACTIONS ··· 105 100 console.log("🪵 Processing initiated"); 106 101 107 102 const cachedTracks = this.output.tracks.collection(); 108 - const result = await callWorkerWithProvisions( 109 - this.#workers, 110 - this.#proxy.process, 111 - { tracks: cachedTracks }, 112 - ); 103 + const result = await this.#proxy.process(cachedTracks); 113 104 114 105 // Save if collection changed 115 106 if (result) await this.output.tracks.save(result);
+1 -6
src/components/orchestrator/process-tracks/types.d.ts
··· 1 1 import type { Track } from "@definitions/types.d.ts"; 2 2 3 3 export type Actions = { 4 - process: ( 5 - args: { 6 - ports: { input: MessagePort; metadataProcessor: MessagePort }; 7 - tracks: Track[]; 8 - }, 9 - ) => Promise<Track[] | null>; 4 + process: (tracks: Track[]) => Promise<Track[] | null>; 10 5 };
+4 -5
src/components/orchestrator/process-tracks/worker.js
··· 4 4 5 5 /** 6 6 * @import {Track} from "@definitions/types.d.ts" 7 - * @import {ProxiedActions} from "@common/worker.d.ts" 7 + * @import {ActionsWithTunnel, ProxiedActions} from "@common/worker.d.ts" 8 8 * @import {InputActions} from "@components/input/types.d.ts" 9 9 * @import {Actions as MetadataProcessorActions} from "@components/processor/metadata/types.d.ts" 10 10 * @import {Actions} from "./types.d.ts" ··· 15 15 //////////////////////////////////////////// 16 16 17 17 /** 18 - * @type {Actions["process"]} 18 + * @type {ActionsWithTunnel<Actions>["process"]} 19 19 */ 20 - export async function process(args) { 21 - const { ports } = args; 22 - const cachedTracks = args.tracks; 20 + export async function process({ data, ports }) { 21 + const cachedTracks = data; 23 22 24 23 /** @type {ProxiedActions<InputActions>} */ 25 24 const input = workerProxy(() => ports.input);
+25 -37
src/components/orchestrator/queue-tracks/element.js
··· 1 - import { 2 - BroadcastableDiffuseElement, 3 - callWorkerWithProvisions, 4 - query, 5 - terminateProvisions, 6 - whenElementsDefined, 7 - workerProxy, 8 - } from "@common/element.js"; 1 + import { BroadcastableDiffuseElement, query } from "@common/element.js"; 9 2 import { untracked } from "@common/signal.js"; 10 3 11 4 /** 12 5 * @import {Track} from "@definitions/types.d.ts" 13 - * @import {ProvisionedWorkers} from "@common/element.d.ts" 14 6 * @import {ProxiedActions} from "@common/worker.d.ts" 15 7 * @import {InputElement} from "@components/input/types.d.ts" 16 8 * @import {OutputElement} from "@components/output/types.d.ts" ··· 34 26 /** @type {ProxiedActions<Actions>} */ 35 27 #proxy; 36 28 37 - /** @type {Promise<ProvisionedWorkers<"input" | "queue">> | undefined} */ 38 - #workers = undefined; 39 - 40 29 constructor() { 41 30 super(); 42 - this.#proxy = workerProxy(this.workerLink); 31 + this.#proxy = this.workerProxy(); 43 32 } 44 33 34 + // LIFECYCLE 35 + 45 36 /** 46 37 * @override 47 38 */ 48 39 async connectedCallback() { 40 + // Broadcast if needed 41 + if (this.hasAttribute("group")) { 42 + this.broadcast(this.nameWithGroup(), {}); 43 + } 44 + 45 + // Super 49 46 super.connectedCallback(); 50 47 51 48 /** @type {InputElement} */ ··· 61 58 this.input = input; 62 59 this.output = output; 63 60 this.queue = queue; 64 - 65 - // Create new workers 66 - this.#workers = whenElementsDefined({ input, queue }).then(() => { 67 - return { 68 - input: input.createWorker(), 69 - queue: queue.worker(), 70 - }; 71 - }); 72 61 73 62 // When defined 74 63 await customElements.whenDefined(this.input.localName); 75 64 await customElements.whenDefined(this.output.localName); 65 + await customElements.whenDefined(this.queue.localName); 76 66 77 67 // Watch tracks collection 78 68 this.effect(() => { ··· 82 72 if (!isLeader) return; 83 73 84 74 untracked(() => 85 - this.poolAvailable(tracks.filter((t) => t.kind !== "placeholder")) 75 + this.#proxy.poolAvailable( 76 + tracks.filter((t) => t.kind !== "placeholder"), 77 + ) 86 78 ); 87 79 }); 88 80 }); 81 + 82 + // 🌸 89 83 } 90 84 85 + // WORKERS 86 + 91 87 /** 92 88 * @override 93 89 */ 94 - async disconnectedCallback() { 95 - super.disconnectedCallback(); 96 - terminateProvisions(await this.#workers); 97 - } 90 + dependencies() { 91 + if (!this.input) throw new Error("Input element not defined yet"); 92 + if (!this.queue) throw new Error("Queue element not defined yet"); 98 93 99 - // 🌊 100 - 101 - /** 102 - * @param {Track[]} cachedTracks 103 - */ 104 - async poolAvailable(cachedTracks) { 105 - return await callWorkerWithProvisions( 106 - this.#workers, 107 - this.#proxy.poolAvailable, 108 - { tracks: cachedTracks }, 109 - ); 94 + return { 95 + input: this.input, 96 + queue: this.queue, 97 + }; 110 98 } 111 99 } 112 100
+1 -4
src/components/orchestrator/queue-tracks/types.d.ts
··· 1 1 import type { Track } from "@definitions/types.d.ts"; 2 2 3 3 export type Actions = { 4 - poolAvailable(args: { 5 - ports: { input: MessagePort; queue: MessagePort }; 6 - tracks: Track[]; 7 - }): Promise<void>; 4 + poolAvailable(tracks: Track[]): Promise<void>; 8 5 };
+4 -5
src/components/orchestrator/queue-tracks/worker.js
··· 2 2 3 3 /** 4 4 * @import {Track} from "@definitions/types.d.ts" 5 - * @import {ProxiedActions} from "@common/worker.d.ts" 5 + * @import {ActionsWithTunnel, ProxiedActions} from "@common/worker.d.ts" 6 6 * @import {InputActions} from "@components/input/types.d.ts" 7 7 * @import {Actions as QueueEngineActions} from "@components/engine/queue/types.d.ts" 8 8 * @import {Actions} from "./types.d.ts" ··· 13 13 //////////////////////////////////////////// 14 14 15 15 /** 16 - * @type {Actions["poolAvailable"]} 16 + * @type {ActionsWithTunnel<Actions>["poolAvailable"]} 17 17 */ 18 - export async function poolAvailable(args) { 19 - const { ports } = args; 20 - const cachedTracks = args.tracks; 18 + export async function poolAvailable({ data, ports }) { 19 + const cachedTracks = data; 21 20 22 21 /** @type {ProxiedActions<InputActions>} */ 23 22 const input = workerProxy(() => ports.input);
+14 -32
src/components/orchestrator/search-tracks/element.js
··· 1 - import { 2 - callWorkerWithProvisions, 3 - DiffuseElement, 4 - provisionWorkers, 5 - query, 6 - terminateProvisions, 7 - workerProxy, 8 - } from "@common/element.js"; 1 + import { DiffuseElement, query } from "@common/element.js"; 9 2 10 3 /** 11 4 * @import {Track} from "@definitions/types.d.ts" 12 - * @import {ProvisionedWorkers} from "@common/element.d.ts" 13 5 * @import {ProxiedActions} from "@common/worker.d.ts" 14 6 * @import {InputElement} from "@components/input/types.d.ts" 15 7 * @import {OutputElement} from "@components/output/types.d.ts" ··· 33 25 /** @type {ProxiedActions<Actions>} */ 34 26 #proxy; 35 27 36 - /** @type {Promise<ProvisionedWorkers<"input" | "search">> | undefined} */ 37 - #workers = undefined; 38 - 39 28 constructor() { 40 29 super(); 41 - this.#proxy = workerProxy(this.workerLink); 30 + this.#proxy = this.workerProxy(); 42 31 } 32 + 33 + // LIFECYCLE 43 34 44 35 /** 45 36 * @override ··· 61 52 this.output = output; 62 53 this.search = search; 63 54 64 - // Create new workers 65 - this.#workers = provisionWorkers({ input, search }); 66 - 67 55 // When defined 68 56 await customElements.whenDefined(this.output.localName); 69 57 ··· 73 61 t.kind !== "placeholder" 74 62 ); 75 63 76 - this.supplyAvailable(tracks); 64 + this.#proxy.supplyAvailable(tracks); 77 65 }); 78 66 } 79 67 68 + // WORKERS 69 + 80 70 /** 81 71 * @override 82 72 */ 83 - async disconnectedCallback() { 84 - super.disconnectedCallback(); 85 - terminateProvisions(await this.#workers); 86 - } 73 + dependencies() { 74 + if (!this.input) throw new Error("Input element not defined yet"); 75 + if (!this.search) throw new Error("Search element not defined yet"); 87 76 88 - // 🚛 89 - 90 - /** 91 - * @param {Track[]} cachedTracks 92 - */ 93 - async supplyAvailable(cachedTracks) { 94 - return await callWorkerWithProvisions( 95 - this.#workers, 96 - this.#proxy.supplyAvailable, 97 - { tracks: cachedTracks }, 98 - ); 77 + return { 78 + input: this.input, 79 + search: this.search, 80 + }; 99 81 } 100 82 } 101 83
+1 -4
src/components/orchestrator/search-tracks/types.d.ts
··· 1 1 import type { Track } from "@definitions/types.d.ts"; 2 2 3 3 export type Actions = { 4 - supplyAvailable(args: { 5 - ports: { input: MessagePort; search: MessagePort }; 6 - tracks: Track[]; 7 - }): Promise<void>; 4 + supplyAvailable(tracks: Track[]): Promise<void>; 8 5 };
+4 -5
src/components/orchestrator/search-tracks/worker.js
··· 2 2 3 3 /** 4 4 * @import {Track} from "@definitions/types.d.ts" 5 - * @import {ProxiedActions} from "@common/worker.d.ts" 5 + * @import {ActionsWithTunnel, ProxiedActions} from "@common/worker.d.ts" 6 6 * @import {InputActions} from "@components/input/types.d.ts" 7 7 * @import {Actions as SearchProcessorActions} from "@components/processor/search/types.d.ts" 8 8 * @import {Actions} from "./types.d.ts" ··· 13 13 //////////////////////////////////////////// 14 14 15 15 /** 16 - * @type {Actions["supplyAvailable"]} 16 + * @type {ActionsWithTunnel<Actions>["supplyAvailable"]} 17 17 */ 18 - export async function supplyAvailable(args) { 19 - const { ports } = args; 20 - const cachedTracks = args.tracks; 18 + export async function supplyAvailable({ data, ports }) { 19 + const cachedTracks = data; 21 20 22 21 /** @type {ProxiedActions<InputActions>} */ 23 22 const input = workerProxy(() => ports.input);
+7
src/themes/blur/artwork-controller/element.js
··· 85 85 * @param {Track | null} track 86 86 */ 87 87 async #changeArtwork(track) { 88 + console.log("QUEUE NOW", track); 89 + 88 90 if (!track) { 89 91 this.#artwork.value = []; 90 92 return; ··· 97 99 method: "HEAD", 98 100 uri: track.uri, 99 101 }); 102 + 103 + console.log(resGet, this.input); 100 104 101 105 if (!resGet) return; 102 106 ··· 116 120 }; 117 121 118 122 const art = await this.artwork?.artwork(request) ?? []; 123 + 124 + console.log(art); 125 + 119 126 const currCacheId = track ? await trackArtworkCacheId(track) : undefined; 120 127 if (cacheId === currCacheId) this.#artwork.set(art); 121 128 }