Experiment to rebuild Diffuse using web applets.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: respect queue group id

+113 -70
+4 -2
src/pages/constituent/blur/artwork-controller/_applet.astro
··· 532 532 // ORCHESTRATED 533 533 534 534 context.settled().then(() => { 535 + console.log("READY"); 536 + 535 537 if (isMainGroup() && context.isMainInstance()) { 536 538 orchestrator.primary 537 539 .sendAction("insert_tracks_into_queue", undefined, { ··· 768 770 } 769 771 770 772 function previous() { 771 - engine.queue.sendAction("unshift", undefined, { worker: true }); 773 + engine.queue.sendAction("unshift", { groupId: context.groupId }, { worker: true }); 772 774 } 773 775 774 776 function next() { 775 - engine.queue.sendAction("shift", undefined, { worker: true }); 777 + engine.queue.sendAction("shift", { groupId: context.groupId }, { worker: true }); 776 778 } 777 779 778 780 controller.appendChild(Controls);
+1 -1
src/pages/core/types.d.ts
··· 11 11 | { supported: true; consult: "undetermined" | boolean }; 12 12 13 13 export type ConsultGrouping = 14 - | { available: false; reason: string } 14 + | { available: false; reason: string; tracks: Track[] } 15 15 | { available: true; tracks: Track[] }; 16 16 17 17 export type GroupConsult = Record<string, ConsultGrouping>;
+8 -7
src/pages/engine/queue/_applet.astro
··· 19 19 20 20 // Register applet 21 21 const context = register<State>({ mode: "shared-worker", worker }); 22 + const groupId = context.groupId || "main"; 22 23 23 24 // Initial state 24 - context.data = await worker.data(); 25 + context.data = await worker.data(groupId); 25 26 26 27 // Keep applet data with worker data in sync 27 - sync(context, port); 28 + sync(context, port, { groupId }); 28 29 29 30 //////////////////////////////////////////// 30 31 // ACTIONS ··· 35 36 context.setActionHandler("unshift", unshift); 36 37 37 38 async function add(items: Track[]) { 38 - await worker.add(transfer(items)); 39 + await worker.add({ groupId, items }); 39 40 } 40 41 41 - async function pool(items: Track[]) { 42 - await worker.pool(transfer(items)); 42 + async function pool(tracks: Track[]) { 43 + await worker.pool({ groupId, tracks }); 43 44 } 44 45 45 46 async function shift() { 46 - await worker.shift(); 47 + await worker.shift({ groupId }); 47 48 } 48 49 49 50 async function unshift() { 50 - await worker.unshift(); 51 + await worker.unshift({ groupId }); 51 52 } 52 53 </script>
+15 -5
src/pages/orchestrator/primary/_applet.astro
··· 98 98 audio, 99 99 (data) => data.items[queue.data.now?.id ?? Infinity]?.hasEnded ?? false, 100 100 (hasEnded) => { 101 - if (hasEnded) queue.sendAction("shift", undefined, { worker: true }); 101 + if (hasEnded) queue.sendAction("shift", { groupId: context.groupId }, { worker: true }); 102 102 }, 103 103 ); 104 104 } ··· 195 195 async function insertTracksIntoQueue() { 196 196 await context.settled(); 197 197 198 + console.log("SETTLED"); 199 + 198 200 // Add tracks to the queue once the tracks have been loaded; 199 201 // and every time the collection changes. 200 202 ··· 204 206 205 207 await wait(output, (d) => d?.tracks.state === "loaded"); 206 208 209 + console.log("TRACKS LOADED"); 210 + 207 211 reactive( 208 212 output, 209 213 (data) => data.tracks.cacheId, ··· 214 218 { timeoutDuration: 60000 * 5, worker: true }, 215 219 ); 216 220 221 + console.log("CONSULTED"); 222 + 217 223 // Available tracks 218 224 const tracks = Object.values(groups).reduce((acc: Track[], value) => { 219 225 if (value.available === false) return acc; ··· 221 227 }, []); 222 228 223 229 // Set pool 224 - await queue.sendAction("pool", tracks, { 225 - timeoutDuration: 60000, 226 - worker: true, 227 - }); 230 + await queue.sendAction( 231 + "pool", 232 + { groupId: context.groupId, tracks }, 233 + { 234 + timeoutDuration: 60000, 235 + worker: true, 236 + }, 237 + ); 228 238 }, 229 239 ); 230 240 }
+5 -1
src/scripts/common.ts
··· 181 181 export function sync<DataType = unknown>( 182 182 context: DiffuseApplet<DataType>, 183 183 port: MessagePort | Worker, 184 + options: { groupId?: string } = {}, 184 185 ) { 185 186 port.onmessage = (event) => { 186 - if (event.data?.type === "data") { 187 + if ( 188 + event.data?.type === "data" && 189 + (options.groupId ? event.data?.groupId === options.groupId : true) 190 + ) { 187 191 context.data = event.data.data; 188 192 } 189 193 };
+17 -6
src/scripts/configurator/input/worker.ts
··· 1 1 import * as URI from "uri-js"; 2 2 3 - import type { Consult, GroupConsult, InputWorkerTasks, Track } from "@applets/core/types"; 3 + import type { 4 + Consult, 5 + ConsultGrouping, 6 + GroupConsult, 7 + InputWorkerTasks, 8 + Track, 9 + } from "@applets/core/types"; 4 10 import { groupTracksPerScheme, initialConnections, provide } from "@scripts/common"; 5 11 6 12 //////////////////////////////////////////// ··· 62 68 Object.keys(groups).map(async (scheme) => { 63 69 if (!isSupportedScheme(scheme)) { 64 70 return { 65 - [scheme]: { available: false, reason: "Unsupported scheme" }, 71 + [scheme]: { 72 + available: false, 73 + reason: "Unsupported scheme", 74 + tracks: groups[scheme] || [], 75 + }, 66 76 }; 67 77 } 68 78 ··· 79 89 } 80 90 81 91 async function list(cachedTracks: Track[] = []) { 82 - const groups = groupTracks(cachedTracks); 92 + const groups = await groupConsult(cachedTracks); 83 93 84 94 const promises = Object.entries(groups).map( 85 - async ([scheme, cachedTracksGroup]: [string, Track[]]) => { 86 - if (!isSupportedScheme(scheme)) return cachedTracksGroup; 95 + async ([scheme, { available, tracks }]: [string, ConsultGrouping]) => { 96 + if (!available) return tracks; 97 + if (!isSupportedScheme(scheme)) return tracks; 87 98 const conn = await connections[scheme].promise; 88 - return conn.list(cachedTracksGroup); 99 + return conn.list(tracks); 89 100 }, 90 101 ); 91 102
+59 -44
src/scripts/engine/queue/worker.ts
··· 3 3 import type { Track } from "@applets/core/types.js"; 4 4 import type { Item, State } from "./types"; 5 5 import { arrayShuffle, postMessages, provide, transfer } from "@scripts/common.ts"; 6 - import { effect, signal } from "@scripts/signal"; 7 6 8 7 //////////////////////////////////////////// 9 8 // SETUP ··· 30 29 31 30 const QUEUE_SIZE = 25; 32 31 33 - const internal: { pool: Track[] } = { 34 - pool: [], 35 - }; 32 + const _internal: Record<string, { pool: Track[] }> = {}; 33 + const _state: Record<string, State> = {}; 34 + 35 + function data(groupId: string) { 36 + return state(groupId); 37 + } 38 + 39 + function emptyState(groupId: string): State { 40 + return { 41 + future: [], 42 + now: null, 43 + past: [], 44 + }; 45 + } 36 46 37 - const [future, setFuture] = signal<Item[]>([]); 38 - const [past, setPast] = signal<Item[]>([]); 39 - const [now, setNow] = signal<Item | null>(null); 47 + function notify(groupId: string) { 48 + const d = data(groupId); 40 49 41 - effect(() => { 42 50 postMessages({ 43 51 data: { 44 52 type: "data", 45 - data: state(), 53 + data: d, 54 + groupId, 46 55 }, 47 56 ports: ports.applets, 48 - transfer: getTransferables(state), 57 + transfer: getTransferables(d), 49 58 }); 50 - }); 59 + } 51 60 52 - function data() { 53 - return state(); 61 + function internal(groupId: string) { 62 + _internal[groupId] ??= { pool: [] }; 63 + return _internal[groupId]; 54 64 } 55 65 56 - function state(): State { 57 - return { 58 - future: future(), 59 - past: past(), 60 - now: now(), 61 - }; 66 + function state(groupId: string) { 67 + _state[groupId] ??= emptyState(groupId); 68 + return _state[groupId]; 62 69 } 63 70 64 71 //////////////////////////////////////////// 65 72 // ACTIONS 66 73 //////////////////////////////////////////// 67 74 68 - function add(items: Item[]) { 69 - setFuture([...future(), ...items]); 75 + function add({ groupId, items }: { groupId: string; items: Item[] }) { 76 + state(groupId).future = [...state(groupId).future, ...items]; 77 + notify(groupId); 70 78 } 71 79 72 - function pool(tracks: Track[]) { 73 - internal.pool = tracks; 80 + function pool({ groupId, tracks }: { groupId: string; tracks: Track[] }) { 81 + internal(groupId).pool = tracks; 82 + const queue = state(groupId); 74 83 75 84 // TODO: If the pool changes, only remove non-existing tracks 76 85 // instead of resetting the whole future queue. 77 86 // 78 87 // What about past queue items? 79 88 80 - setFuture([]); 81 - fill(); 89 + queue.future = []; 90 + fill(groupId); 82 91 83 92 // Automatically insert track if there isn't any 84 - if (!now()) return shift(); 93 + if (!queue.now) return shift({ groupId }); 94 + else notify(groupId); 85 95 } 86 96 87 - function shift() { 88 - const now = future()[0] ?? null; 89 - setNow(now); 97 + function shift({ groupId }: { groupId: string }) { 98 + const queue = state(groupId); 99 + const now = queue.future[0] ?? null; 100 + queue.now = now; 90 101 91 - setFuture(future().slice(1)); 92 - setPast(now ? [...past(), now] : past()); 102 + queue.future = queue.future.slice(1); 103 + queue.past = now ? [...queue.past, now] : queue.past; 93 104 94 - fill(); 105 + fill(groupId); 95 106 } 96 107 97 - function unshift() { 98 - if (past().length === 0) return; 108 + function unshift({ groupId }: { groupId: string }) { 109 + const queue = state(groupId); 110 + if (queue.past.length === 0) return; 99 111 100 - const [last] = past().splice(past().length - 1, 1); 112 + const [last] = queue.past.splice(queue.past.length - 1, 1); 101 113 const now = last ?? null; 102 114 103 - setNow(now); 104 - setFuture(now ? [now, ...future()] : future()); 115 + queue.now = now; 116 + queue.future = now ? [now, ...queue.future] : queue.future; 117 + 118 + notify(groupId); 105 119 } 106 120 107 121 // 🛠️ 108 122 109 123 // TODO: Most likely there's a more performant solution 110 - function fill() { 111 - if (future().length >= QUEUE_SIZE) return; 124 + function fill(groupId: string) { 125 + const queue = state(groupId); 126 + if (queue.future.length >= QUEUE_SIZE) return; 112 127 113 - let reducedPool = internal.pool.reduce( 128 + let reducedPool = internal(groupId).pool.reduce( 114 129 ({ past, pool }: { past: Set<string>; pool: Track[] }, track: Track) => { 115 130 if (past.has(track.id)) 116 131 return { ··· 123 138 pool: [...pool, track], 124 139 }; 125 140 }, 126 - { past: new Set(past().map((t) => t.id)), pool: [] }, 141 + { past: new Set(queue.past.map((t) => t.id)), pool: [] }, 127 142 ).pool; 128 143 129 144 if (reducedPool.length === 0) { 130 - reducedPool = internal.pool; 145 + reducedPool = internal(groupId).pool; 131 146 } 132 147 133 - const poolSelection = arrayShuffle(reducedPool).slice(0, QUEUE_SIZE - future().length); 134 - add(poolSelection); 148 + const poolSelection = arrayShuffle(reducedPool).slice(0, QUEUE_SIZE - queue.future.length); 149 + add({ groupId, items: poolSelection }); 135 150 }
+1 -1
src/scripts/input/native-fs/worker.ts
··· 55 55 const handle = handles[handleId]; 56 56 const grouping: ConsultGrouping = handle 57 57 ? { available: true, tracks } 58 - : { available: false, reason: "Handle not available" }; 58 + : { available: false, reason: "Handle not available", tracks }; 59 59 60 60 return { 61 61 key: URI.serialize({ scheme: SCHEME, host: handleId }),
+1 -1
src/scripts/input/opensubsonic/worker.ts
··· 55 55 const available = await consultServer(server); 56 56 const grouping: ConsultGrouping = available 57 57 ? { available, tracks } 58 - : { available, reason: "Server ping failed" }; 58 + : { available, reason: "Server ping failed", tracks }; 59 59 60 60 return { 61 61 key: `${SCHEME}:${serverId}`,
+1 -1
src/scripts/input/s3/worker.ts
··· 52 52 const available = await consultBucket(bucket); 53 53 const grouping: ConsultGrouping = available 54 54 ? { available, tracks } 55 - : { available, reason: "Bucket unavailable" }; 55 + : { available, reason: "Bucket unavailable", tracks }; 56 56 57 57 return { 58 58 key: `${SCHEME}:${bucketId}`,
+1 -1
src/scripts/theme/pilot/index.ts
··· 56 56 // Automatically start playing something if nothing is playing yet. 57 57 if (!audioId) { 58 58 if (isPlaying) { 59 - const now = await engine.queue.sendAction("shift"); 59 + const now = await engine.queue.sendAction("shift", { groupId: "main" }); 60 60 if (!now) { 61 61 console.warn("No tracks available yet, try again later."); 62 62 await ui.audio.sendAction("modifyIsPlaying", false);