Experiment to rebuild Diffuse using web applets.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: move queue state to worker

+342 -169
+2 -2
src/pages/constituent/blur/artwork-controller/_applet.astro
··· 742 742 } 743 743 744 744 function previous() { 745 - engine.queue.sendAction("unshift"); 745 + engine.queue.sendAction("unshift", undefined, { worker: true }); 746 746 } 747 747 748 748 function next() { 749 - engine.queue.sendAction("shift"); 749 + engine.queue.sendAction("shift", undefined, { worker: true }); 750 750 } 751 751 752 752 controller.appendChild(Controls);
+15 -12
src/pages/engine/queue/_applet.astro
··· 4 4 import type { State } from "./types.d.ts"; 5 5 6 6 import { register } from "@scripts/applet/common"; 7 - import { endpoint, SharedWorker, transfer } from "@scripts/common"; 7 + import { endpoint, SharedWorker, sync, transfer } from "@scripts/common"; 8 8 import manifest from "./_manifest.json"; 9 9 10 10 //////////////////////////////////////////// 11 11 // SETUP 12 12 //////////////////////////////////////////// 13 - const worker = endpoint<Tasks>( 14 - new SharedWorker(new URL("../../../scripts/engine/queue/worker", import.meta.url), { 15 - type: "module", 16 - name: manifest.name, 17 - }).port, 18 - ); 13 + const port = new SharedWorker(new URL("../../../scripts/engine/queue/worker", import.meta.url), { 14 + type: "module", 15 + name: manifest.name, 16 + }).port; 17 + 18 + const worker = endpoint<Tasks>(port); 19 19 20 20 // Register applet 21 - const context = register<State>({ worker }); 21 + const context = register<State>({ mode: "shared-worker", worker }); 22 + 23 + // Keep applet data with worker data in sync 24 + sync(context, port); 22 25 23 26 // Initial state 24 27 context.data = { ··· 36 39 context.setActionHandler("unshift", unshift); 37 40 38 41 async function add(items: Track[]) { 39 - context.data = await worker.add(transfer(items)); 42 + await worker.add(transfer(items)); 40 43 } 41 44 42 45 async function pool(items: Track[]) { 43 - context.data = await worker.pool(transfer(items)); 46 + await worker.pool(transfer(items)); 44 47 } 45 48 46 49 async function shift() { 47 - context.data = await worker.shift(); 50 + await worker.shift(); 48 51 } 49 52 50 53 async function unshift() { 51 - context.data = await worker.unshift(); 54 + await worker.unshift(); 52 55 } 53 56 </script>
+1 -1
src/pages/orchestrator/queue-audio/_applet.astro
··· 41 41 engine.audio, 42 42 (data) => data.items[engine.queue.data.now?.id ?? Infinity]?.hasEnded ?? false, 43 43 (hasEnded) => { 44 - if (hasEnded) engine.queue.sendAction("shift"); 44 + if (hasEnded) engine.queue.sendAction("shift", undefined, { worker: true }); 45 45 }, 46 46 ); 47 47
+1
src/pages/orchestrator/queue-tracks/_applet.astro
··· 49 49 // Clear 50 50 engine.queue.sendAction("pool", tracks, { 51 51 timeoutDuration: 60000, 52 + worker: true, 52 53 }); 53 54 }, 54 55 );
+143 -93
src/scripts/applet/common.ts
··· 1 1 import type { Applet, AppletEvent, AppletScope } from "@web-applets/sdk"; 2 - import type * as Comlink from "comlink"; 2 + import * as Comlink from "comlink"; 3 3 4 4 import { applets } from "@web-applets/sdk"; 5 5 import { type ElementConfigurator, h } from "spellcaster/hyperscript.js"; ··· 102 102 //////////////////////////////////////////// 103 103 // 🪟 Applet registration 104 104 //////////////////////////////////////////// 105 - export type BroadcastedApplet<T> = { 105 + export type DiffuseApplet<T> = { 106 106 groupId: string | undefined; 107 107 scope: AppletScope<T>; 108 108 ··· 111 111 get instanceId(): string; 112 112 set data(data: T); 113 113 114 - codec: { 115 - decode(data: any): T; 116 - encode(data: T): any; 117 - }; 114 + codec: Codec<T>; 118 115 119 - isMainInstance(): boolean; 116 + isMainInstance(): boolean | null; 120 117 setActionHandler<H extends Function>(actionId: string, actionHandler: H): void; 121 118 }; 122 119 120 + export type Codec<T> = { 121 + decode(data: any): T; 122 + encode(data: T): any; 123 + }; 124 + 123 125 export function register<DataType = any>( 124 - options: { worker?: Comlink.Remote<WorkerTasks> } = {}, 125 - ): BroadcastedApplet<DataType> { 126 + options: { mode?: "broadcast" | "shared-worker"; worker?: Comlink.Remote<WorkerTasks> } = {}, 127 + ): DiffuseApplet<DataType> { 128 + const mode = options.mode ?? "broadcast"; 126 129 const url = new URL(location.href); 127 130 const scope = applets.register<DataType>(); 128 131 ··· 130 133 const channelId = `${location.host}${location.pathname}/${groupId}`; 131 134 const instanceId = crypto.randomUUID(); 132 135 133 - let isMainInstance = true; 136 + // Codec 137 + const codec = { 138 + decode: (data: any) => data as DataType, 139 + encode: (data: DataType) => data as any, 140 + }; 141 + 142 + // Channel 143 + const channelContext = 144 + mode === "broadcast" 145 + ? broadcastChannel({ 146 + channelId, 147 + codec, 148 + instanceId, 149 + scope, 150 + }) 151 + : undefined; 152 + 153 + // Context 154 + const context: DiffuseApplet<DataType> = { 155 + groupId, 156 + scope, 157 + 158 + settled() { 159 + return channelContext?.promise.then(() => {}) ?? Promise.resolve(); 160 + }, 161 + 162 + get instanceId() { 163 + return instanceId; 164 + }, 165 + 166 + get data() { 167 + return scope.data; 168 + }, 169 + 170 + set data(data: DataType) { 171 + scope.data = data; 172 + }, 173 + 174 + codec, 175 + 176 + isMainInstance() { 177 + return channelContext?.mainSignal[0]() ?? null; 178 + }, 179 + 180 + setActionHandler: <H extends Function>(actionId: string, actionHandler: H) => { 181 + switch (mode) { 182 + case "broadcast": 183 + return channelContext?.setActionHandler(actionId, actionHandler); 184 + 185 + case "shared-worker": 186 + return scope.setActionHandler(actionId, actionHandler); 187 + } 188 + }, 189 + }; 190 + 191 + if (options.worker) { 192 + context.scope.onworkerport = (event) => { 193 + if (!event.port) return; 194 + options.worker?._listen(transfer(event.port)); 195 + }; 196 + } 197 + 198 + return context; 199 + } 200 + 201 + function broadcastChannel<DataType>({ 202 + channelId, 203 + codec, 204 + instanceId, 205 + scope, 206 + }: { 207 + channelId: string; 208 + codec: Codec<DataType>; 209 + instanceId: string; 210 + scope: AppletScope<DataType>; 211 + }) { 212 + const mainSignal = signal<boolean>(true); 213 + const [isMain, setIsMain] = mainSignal; 134 214 135 215 // One instance to rule them all 136 216 // ··· 149 229 instanceId: event.data.instanceId, 150 230 }); 151 231 152 - if (isMainInstance) { 232 + if (isMain()) { 153 233 channel.postMessage({ 154 234 type: "data", 155 - data: context.codec.encode(scope.data), 235 + data: codec.encode(scope.data), 156 236 }); 157 237 } 158 238 break; ··· 160 240 161 241 case "PONG": { 162 242 if (event.data.instanceId === instanceId) { 163 - isMainInstance = false; 243 + setIsMain(false); 164 244 } 165 245 break; 166 246 } 167 247 168 248 case "action": { 169 - if (isMainInstance) { 249 + if (isMain()) { 170 250 const result = await scope.actionHandlers[event.data.actionId]?.(...event.data.arguments); 171 251 channel.postMessage({ 172 252 type: "actioncomplete", ··· 178 258 } 179 259 180 260 case "data": { 181 - scope.data = context.codec.decode(event.data.data); 261 + scope.data = codec.decode(event.data.data); 182 262 break; 183 263 } 184 264 } ··· 206 286 207 287 const promise = makeMainPromise(); 208 288 209 - // Send out ping 210 - channel.postMessage({ 211 - type: "PING", 212 - instanceId, 213 - }); 214 - 215 289 // If the data on the main instance changes, 216 290 // pass it on to other instances. 217 291 scope.addEventListener("data", async (event: AppletEvent) => { 218 292 await promise; 219 293 220 - if (isMainInstance) { 294 + if (isMain()) { 221 295 channel.postMessage({ 222 296 type: "data", 223 - data: context.codec.encode(event.data), 297 + data: codec.encode(event.data), 224 298 }); 225 299 } 226 300 }); 227 301 228 - // Context 229 - const context: BroadcastedApplet<DataType> = { 230 - groupId, 231 - scope, 302 + // Send out ping 303 + channel.postMessage({ 304 + type: "PING", 305 + instanceId, 306 + }); 232 307 233 - settled() { 234 - return promise.then(() => {}); 235 - }, 308 + // Action handler 309 + const setActionHandler = <H extends Function>(actionId: string, actionHandler: H) => { 310 + const handler = async (...args: any) => { 311 + if (isMain()) { 312 + return actionHandler(...args); 313 + } 236 314 237 - get instanceId() { 238 - return instanceId; 239 - }, 315 + // Check if a main instance is still available, 316 + // if not, then this is the new main. 317 + const promised = await makeMainPromise(); 318 + setIsMain(promised.isMain); 240 319 241 - get data() { 242 - return scope.data; 243 - }, 320 + if (isMain()) { 321 + return actionHandler(...args); 322 + } 244 323 245 - set data(data: DataType) { 246 - scope.data = data; 247 - }, 248 - 249 - codec: { 250 - decode: (data: any) => data as DataType, 251 - encode: (data: DataType) => data as any, 252 - }, 253 - 254 - isMainInstance() { 255 - return isMainInstance; 256 - }, 257 - 258 - setActionHandler: <H extends Function>(actionId: string, actionHandler: H) => { 259 - const handler = async (...args: any) => { 260 - if (isMainInstance) { 261 - return actionHandler(...args); 262 - } 263 - 264 - // Check if a main instance is still available, 265 - // if not, then this is the new main. 266 - const { isMain } = await makeMainPromise(); 267 - isMainInstance = isMain; 268 - 269 - if (isMainInstance) { 270 - return actionHandler(...args); 271 - } 324 + const actionMessage = { 325 + actionInstanceId: crypto.randomUUID(), 326 + actionId, 327 + type: "action", 328 + arguments: args, 329 + }; 272 330 273 - const actionMessage = { 274 - actionInstanceId: crypto.randomUUID(), 275 - actionId, 276 - type: "action", 277 - arguments: args, 331 + return await new Promise((resolve) => { 332 + const actionCallback = (event: MessageEvent) => { 333 + if ( 334 + event.data?.type === "actioncomplete" && 335 + event.data?.actionInstanceId === actionMessage.actionInstanceId 336 + ) { 337 + channel.removeEventListener("message", actionCallback); 338 + resolve(event.data.result); 339 + } 278 340 }; 279 341 280 - return await new Promise((resolve) => { 281 - const actionCallback = (event: MessageEvent) => { 282 - if ( 283 - event.data?.type === "actioncomplete" && 284 - event.data?.actionInstanceId === actionMessage.actionInstanceId 285 - ) { 286 - channel.removeEventListener("message", actionCallback); 287 - resolve(event.data.result); 288 - } 289 - }; 342 + channel.addEventListener("message", actionCallback); 343 + channel.postMessage(actionMessage); 344 + }); 345 + }; 290 346 291 - channel.addEventListener("message", actionCallback); 292 - channel.postMessage(actionMessage); 293 - }); 294 - }; 347 + scope.setActionHandler(actionId, handler); 348 + }; 295 349 296 - scope.setActionHandler(actionId, handler); 297 - }, 350 + // Fin 351 + return { 352 + channel, 353 + mainSignal, 354 + promise, 355 + setActionHandler, 298 356 }; 299 - 300 - if (options.worker !== undefined) 301 - context.scope.onworkerport = (event) => { 302 - if (!event.port) return; 303 - options.worker?._listen(transfer(event.port)); 304 - }; 305 - 306 - return context; 307 357 } 308 358 309 359 //////////////////////////////////////////// ··· 326 376 }); 327 377 } 328 378 329 - export function makeConnect<X>(context: BroadcastedApplet<X>) { 379 + export function makeConnect<X>(context: DiffuseApplet<X>) { 330 380 return <D, T>(applet: Applet<D>, dataFn: (data: D) => T, effectFn: (t: T) => void) => { 331 381 return reactive(applet, dataFn, (t: T) => { 332 382 if (context.isMainInstance()) effectFn(t);
+57 -7
src/scripts/common.ts
··· 4 4 import { getTransferables } from "@okikio/transferables"; 5 5 6 6 import type { Track } from "@applets/core/types"; 7 + import type { DiffuseApplet } from "./applet/common"; 7 8 8 9 // export { SharedWorkerPolyfill as SharedWorker } from "@okikio/sharedworker"; 9 10 export const SharedWorker = globalThis.SharedWorker; ··· 66 67 return e; 67 68 } 68 69 69 - export function expose<A extends Record<string, any>>(tasks: A): A { 70 + export function expose<A extends Record<string, any>>( 71 + tasks: A, 72 + opts?: { 73 + ports?: { 74 + applets: MessagePort[]; 75 + consumers: MessagePort[]; 76 + }; 77 + }, 78 + ): A { 70 79 if (globalThis.SharedWorkerGlobalScope && self instanceof SharedWorkerGlobalScope) { 71 80 self.onconnect = (event: MessageEvent) => { 72 81 const port = event.ports[0]; 82 + opts?.ports?.applets?.push(port); 73 83 Comlink.expose(tasks, port); 74 84 port.start(); 75 85 }; ··· 118 128 return new TextEncoder().encode(JSON.stringify(a)); 119 129 } 120 130 131 + export function postMessages<D, T>({ 132 + data, 133 + ports, 134 + transfer, 135 + }: { 136 + data: D; 137 + ports: MessagePort[]; 138 + transfer?: Transferable[]; 139 + }) { 140 + ports.forEach((port) => { 141 + port.postMessage(data, transfer ?? []); 142 + }); 143 + } 144 + 121 145 export function provide< 122 146 C extends Record<string, any>, 123 147 A extends Record<string, any>, ··· 131 155 connections?: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>; 132 156 tasks?: T; 133 157 }) { 134 - const allTasks = expose<WorkerTasks & T>({ 135 - _listen: _listen<A>(actions || ({} as A)), 136 - _manage: _manage<C>(connections || {}), 137 - ...(tasks || ({} as T)), 138 - }); 158 + const portsHolder = { 159 + applets: [] as MessagePort[], 160 + consumers: [] as MessagePort[], 161 + }; 162 + 163 + const allTasks = expose<WorkerTasks & T>( 164 + { 165 + _listen: _listen<A>(actions || ({} as A), portsHolder), 166 + _manage: _manage<C>(connections || {}), 167 + ...(tasks || ({} as T)), 168 + }, 169 + { 170 + ports: portsHolder, 171 + }, 172 + ); 139 173 140 174 return { 141 175 connections: connections || ({} as Record<string, PromiseWithResolvers<Comlink.Remote<C>>>), 176 + ports: portsHolder, 142 177 tasks: allTasks, 143 178 }; 144 179 } 145 180 181 + export function sync<DataType = unknown>(context: DiffuseApplet<DataType>, port: MessagePort) { 182 + port.onmessage = (event) => { 183 + if (event.data?.type === "data") { 184 + context.data = event.data.data; 185 + } 186 + }; 187 + } 188 + 146 189 export async function trackArtworkCacheId(track: Track): Promise<string> { 147 190 return await crypto.subtle 148 191 .digest("SHA-256", new TextEncoder().encode(track.uri)) ··· 156 199 157 200 // PRIVATE 158 201 159 - function _listen<A extends Record<string, any>>(actions: A) { 202 + function _listen<A extends Record<string, any>>( 203 + actions: A, 204 + portsHolder: { 205 + applets: MessagePort[]; 206 + consumers: MessagePort[]; 207 + }, 208 + ) { 160 209 async function handleAction( 161 210 port: MessagePort, 162 211 action: { ··· 185 234 186 235 return (port: MessagePort) => { 187 236 Comlink.expose(actions, port); 237 + portsHolder.consumers.push(port); 188 238 189 239 port.onmessage = async (message) => { 190 240 switch (message.data?.type) {
+56 -52
src/scripts/engine/queue/worker.ts
··· 1 + import { getTransferables } from "@okikio/transferables"; 2 + 1 3 import type { Track } from "@applets/core/types.js"; 2 4 import type { Item, State } from "./types"; 3 - import { arrayShuffle, provide, transfer } from "@scripts/common.ts"; 4 - 5 - //////////////////////////////////////////// 6 - // STATE 7 - //////////////////////////////////////////// 8 - 9 - const QUEUE_SIZE = 25; 10 - 11 - const internal: { pool: Track[] } = { 12 - pool: [], 13 - }; 14 - 15 - const state: State = { 16 - future: [], 17 - past: [], 18 - now: null, 19 - }; 20 - 21 - function data() { 22 - return transfer({ ...state }); 23 - } 5 + import { arrayShuffle, postMessages, provide, transfer } from "@scripts/common.ts"; 6 + import { effect, signal } from "@scripts/signal"; 24 7 25 8 //////////////////////////////////////////// 26 9 // SETUP ··· 33 16 unshift, 34 17 }; 35 18 36 - const { tasks } = provide({ 19 + const { ports, tasks } = provide({ 37 20 actions, 38 21 tasks: actions, 39 22 }); ··· 42 25 export type Tasks = typeof tasks; 43 26 44 27 //////////////////////////////////////////// 45 - // ACTIONS 28 + // STATE 46 29 //////////////////////////////////////////// 47 30 48 - function add(items: Item[]): State { 49 - state.future = [...state.future, ...items]; 31 + const QUEUE_SIZE = 25; 32 + 33 + const internal: { pool: Track[] } = { 34 + pool: [], 35 + }; 36 + 37 + const [future, setFuture] = signal<Item[]>([]); 38 + const [past, setPast] = signal<Item[]>([]); 39 + const [now, setNow] = signal<Item | null>(null); 40 + 41 + effect(() => { 42 + const state: State = { 43 + future: future(), 44 + past: past(), 45 + now: now(), 46 + }; 50 47 51 - // Fin 52 - return data(); 48 + postMessages({ 49 + data: { 50 + type: "data", 51 + data: state, 52 + }, 53 + ports: ports.applets, 54 + transfer: getTransferables(state), 55 + }); 56 + }); 57 + 58 + //////////////////////////////////////////// 59 + // ACTIONS 60 + //////////////////////////////////////////// 61 + 62 + function add(items: Item[]) { 63 + setFuture([...future(), ...items]); 53 64 } 54 65 55 - function pool(tracks: Track[]): State { 66 + function pool(tracks: Track[]) { 56 67 internal.pool = tracks; 57 68 58 69 // TODO: If the pool changes, only remove non-existing tracks ··· 60 71 // 61 72 // What about past queue items? 62 73 63 - state.future = []; 74 + setFuture([]); 64 75 fill(); 65 76 66 77 // Automatically insert track if there isn't any 67 - if (!state.now) return shift(); 68 - 69 - // Fin 70 - return data(); 78 + if (!now()) return shift(); 71 79 } 72 80 73 - function shift(): State { 74 - state.now = state.future[0] || null; 75 - state.future = state.future.slice(1); 76 - state.past = state.now ? [...state.past, state.now] : state.past; 81 + function shift() { 82 + const now = future()[0] ?? null; 83 + setNow(now); 77 84 78 - fill(); 85 + setFuture(future().slice(1)); 86 + setPast(now ? [...past(), now] : past()); 79 87 80 - // Fin 81 - return data(); 88 + fill(); 82 89 } 83 90 84 - function unshift(): State { 85 - if (state.past.length === 0) return state; 91 + function unshift() { 92 + if (past().length === 0) return; 86 93 87 - const past = [...state.past]; 88 - const [last] = past.splice(past.length - 1, 1); 89 - state.now = last ?? null; 90 - state.future = state.now ? [state.now, ...state.future] : state.future; 94 + const [last] = past().splice(past().length - 1, 1); 95 + const now = last ?? null; 91 96 92 - // Fin 93 - return data(); 97 + setNow(now); 98 + setFuture(now ? [now, ...future()] : future()); 94 99 } 95 100 96 101 // 🛠️ 97 102 98 103 // TODO: Most likely there's a more performant solution 99 104 function fill() { 100 - if (state.future.length >= QUEUE_SIZE) return state; 105 + if (future().length >= QUEUE_SIZE) return; 101 106 102 107 let reducedPool = internal.pool.reduce( 103 108 ({ past, pool }: { past: Set<string>; pool: Track[] }, track: Track) => { ··· 112 117 pool: [...pool, track], 113 118 }; 114 119 }, 115 - { past: new Set(state.past.map((t) => t.id)), pool: [] }, 120 + { past: new Set(past().map((t) => t.id)), pool: [] }, 116 121 ).pool; 117 122 118 123 if (reducedPool.length === 0) { 119 124 reducedPool = internal.pool; 120 125 } 121 126 122 - const poolSelection = arrayShuffle(reducedPool).slice(0, QUEUE_SIZE - state.future.length); 123 - 127 + const poolSelection = arrayShuffle(reducedPool).slice(0, QUEUE_SIZE - future().length); 124 128 add(poolSelection); 125 129 }
+2 -2
src/scripts/output/common.ts
··· 1 1 import { xxh32r } from "xxh32/dist/raw.js"; 2 2 3 3 import type { ManagedOutput, Track } from "@applets/core/types"; 4 - import type { BroadcastedApplet } from "@scripts/applet/common"; 4 + import type { DiffuseApplet } from "@scripts/applet/common"; 5 5 import { jsonEncode } from "@scripts/common"; 6 6 7 7 export const INITIAL_MANAGED_OUTPUT: ManagedOutput = { ··· 13 13 }; 14 14 15 15 export function outputManager<DataType>(args: { 16 - context: BroadcastedApplet<DataType>; 16 + context: DiffuseApplet<DataType>; 17 17 /* Indicate if the initial data loader may proceed. */ 18 18 init?: () => Promise<boolean>; 19 19 tracks: {
+65
src/scripts/signal.ts
··· 1 + import { Signal } from "signal-polyfill"; 2 + 3 + // SIGNAL 4 + 5 + export type Signal<T> = () => T; 6 + 7 + export const signal = <T>(initial: T): [Signal<T>, (value: T) => void] => { 8 + const state = new Signal.State(initial); 9 + const get = () => state.get(); 10 + const set = (value: T) => state.set(value); 11 + return [get, set]; 12 + }; 13 + 14 + // EFFECT 15 + 16 + export const throttled = ( 17 + job: () => void, 18 + queue: (callback: () => void) => void = queueMicrotask, 19 + ): (() => void) => { 20 + let isScheduled = false; 21 + 22 + const perform = () => { 23 + job(); 24 + isScheduled = false; 25 + }; 26 + 27 + const schedule = () => { 28 + if (!isScheduled) { 29 + isScheduled = true; 30 + queue(perform); 31 + } 32 + }; 33 + 34 + return schedule; 35 + }; 36 + 37 + const watcher = new Signal.subtle.Watcher( 38 + throttled(() => { 39 + for (const signal of watcher.getPending()) { 40 + signal.get(); 41 + } 42 + watcher.watch(); 43 + }), 44 + ); 45 + 46 + export type Cancel = () => void; 47 + 48 + export const effect = (perform: () => Cancel | void) => { 49 + let cleanup: Cancel | undefined; 50 + 51 + const signal = new Signal.Computed(() => { 52 + cleanup?.(); 53 + cleanup = perform() ?? undefined; 54 + }); 55 + 56 + watcher.watch(signal); 57 + signal.get(); 58 + 59 + const dispose = () => { 60 + cleanup?.(); 61 + watcher.unwatch(signal); 62 + }; 63 + 64 + return dispose; 65 + };