Experiment to rebuild Diffuse using web applets.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: worker tunnel

+139 -99
+36 -14
src/pages/configurator/input/_applet.astro
··· 34 34 <script> 35 35 import type { Applet } from "@web-applets/sdk"; 36 36 37 + import type { Track } from "@applets/core/types"; 37 38 import type { Tasks } from "@scripts/configurator/input/worker"; 38 - import type { Scheme } from "@scripts/configurator/input/types"; 39 - import { CONNECTIONS } from "@scripts/configurator/input/constants"; 40 - import { applet, register } from "@scripts/applet/common"; 39 + import { applet, register, tunnel } from "@scripts/applet/common"; 41 40 import { endpoint, transfer } from "@scripts/common"; 42 41 import manifest from "./_manifest.json"; 43 42 ··· 54 53 // Register applet + worker 55 54 const context = register({ worker }); 56 55 56 + // Applet connections 57 + const input = { 58 + "file+local": applet("/input/native-fs"), 59 + opensubsonic: applet("/input/opensubsonic"), 60 + s3: applet("/input/s3"), 61 + }; 62 + 63 + // Provide tunnel to worker 64 + tunnel(worker, input); 65 + 57 66 //////////////////////////////////////////// 58 - // CONNECTIONS 67 + // ACTIONS 59 68 //////////////////////////////////////////// 60 - const connections: Record<Scheme, Promise<Applet>> = { 61 - "file+local": applet(CONNECTIONS["file+local"], { context: self }), 62 - opensubsonic: applet(CONNECTIONS.opensubsonic, { context: self }), 63 - s3: applet(CONNECTIONS.s3, { context: self }), 69 + const consult = async (fileUriOrScheme: string) => { 70 + return await worker.consult(fileUriOrScheme); 64 71 }; 65 72 66 - // Pass worker message ports to configurator worker 67 - Object.entries(connections).forEach(([scheme, promise]) => { 68 - promise.then((conn) => { 69 - return worker.manageInputWorker(scheme as Scheme, transfer(conn.ports.worker)); 70 - }); 71 - }); 73 + const contextualize = async (cachedTracks: Track[]) => { 74 + return await worker.contextualize(transfer(cachedTracks)); 75 + }; 76 + 77 + const groupConsult = async (tracks: Track[]) => { 78 + return await worker.groupConsult(transfer(tracks)); 79 + }; 80 + 81 + const list = async (cachedTracks: Track[] = []) => { 82 + return await worker.list(transfer(cachedTracks)); 83 + }; 84 + 85 + const resolve = async (args: { method: string; uri: string }) => { 86 + return await worker.resolve(args); 87 + }; 88 + 89 + context.setActionHandler("consult", consult); 90 + context.setActionHandler("contextualize", contextualize); 91 + context.setActionHandler("groupConsult", groupConsult); 92 + context.setActionHandler("list", list); 93 + context.setActionHandler("resolve", resolve); 72 94 </script>
+1 -1
src/pages/core/types.d.ts
··· 20 20 consult(fileUriOrScheme: string): Promise<Consult>; 21 21 contextualize(tracks: Track[]): Promise<void>; 22 22 groupConsult(tracks: Track[]): Promise<GroupConsult>; 23 - list(cachedTracks: Track[] = []): Promise<Track[]>; 23 + list(cachedTracks: Track[]): Promise<Track[]>; 24 24 resolve({ method, uri }: { method: string; uri: string }): Promise<ResolvedUri>; 25 25 } 26 26
+1 -1
src/pages/input/native-fs/_applet.astro
··· 55 55 return await worker.list(transfer(cachedTracks)); 56 56 }; 57 57 58 - const resolve = async (args: { uri: string }) => { 58 + const resolve = async (args: { method: string; uri: string }) => { 59 59 return await worker.resolve(args); 60 60 }; 61 61
+12 -1
src/scripts/applet/common.ts
··· 88 88 return applet; 89 89 } 90 90 91 + export function tunnel( 92 + worker: Comlink.Remote<WorkerTasks>, 93 + connections: Record<string, Applet | Promise<Applet>>, 94 + ) { 95 + Object.entries(connections).forEach(([scheme, promise]) => { 96 + Promise.resolve(promise).then((conn) => { 97 + return worker._manage(scheme, transfer(conn.ports.worker)); 98 + }); 99 + }); 100 + } 101 + 91 102 //////////////////////////////////////////// 92 103 // 🪟 Applet registration 93 104 //////////////////////////////////////////// ··· 289 300 if (options.worker !== undefined) 290 301 context.scope.onworkerport = (event) => { 291 302 if (!event.port) return; 292 - options.worker?.listenForActions(transfer(event.port)); 303 + options.worker?._listen(transfer(event.port)); 293 304 }; 294 305 295 306 return context;
+53 -9
src/scripts/common.ts
··· 13 13 //////////////////////////////////////////// 14 14 15 15 export type WorkerTasks = { 16 - listenForActions: ReturnType<typeof handleWorkerActions>; 16 + _listen: ReturnType<typeof _listen>; 17 + _manage: ReturnType<typeof _manage>; 17 18 }; 18 19 19 20 //////////////////////////////////////////// ··· 95 96 return window.self !== window.top; 96 97 } 97 98 99 + export function initialConnections<C extends Record<string, any>>(ids: string[]) { 100 + const connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>> = {}; 101 + 102 + ids.forEach((c) => { 103 + connections[c] = Promise.withResolvers<Comlink.Remote<C>>(); 104 + }); 105 + 106 + return connections; 107 + } 108 + 98 109 export function isPrimitive(test: unknown) { 99 110 return test !== Object(test); 100 111 } ··· 107 118 return new TextEncoder().encode(JSON.stringify(a)); 108 119 } 109 120 110 - export function provide<A extends Record<string, any>, B extends Record<string, any>>( 111 - actions: A, 112 - tasks: B = {} as B, 113 - ) { 114 - return expose<WorkerTasks & B>({ 115 - listenForActions: handleWorkerActions(actions), 116 - ...tasks, 121 + export function provide< 122 + C extends Record<string, any>, 123 + A extends Record<string, any>, 124 + T extends Record<string, any>, 125 + >({ 126 + actions, 127 + connections, 128 + tasks, 129 + }: { 130 + actions?: A; 131 + connections?: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>; 132 + tasks?: T; 133 + }) { 134 + const allTasks = expose<WorkerTasks & T>({ 135 + _listen: _listen<A>(actions || ({} as A)), 136 + _manage: _manage<C>(connections || {}), 137 + ...(tasks || ({} as T)), 117 138 }); 139 + 140 + return { 141 + connections: connections || ({} as Record<string, PromiseWithResolvers<Comlink.Remote<C>>>), 142 + tasks: allTasks, 143 + }; 118 144 } 119 145 120 146 export async function trackArtworkCacheId(track: Track): Promise<string> { ··· 130 156 131 157 // PRIVATE 132 158 133 - function handleWorkerActions<A extends Record<string, any>>(actions: A) { 159 + function _listen<A extends Record<string, any>>(actions: A) { 134 160 async function handleAction( 135 161 port: MessagePort, 136 162 action: { ··· 168 194 }; 169 195 }; 170 196 } 197 + 198 + function _manage<C extends Record<string, any>>( 199 + connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>, 200 + ) { 201 + console.log(connections); 202 + 203 + return (connectionId: string, workerPort: MessagePort) => { 204 + let conn = connections[connectionId]; 205 + const remote = endpoint<C>(workerPort); 206 + 207 + if (!conn) { 208 + connections[connectionId] = Promise.withResolvers<Comlink.Remote<C>>(); 209 + conn = connections[connectionId]; 210 + } 211 + 212 + conn.resolve(remote); 213 + }; 214 + }
-6
src/scripts/configurator/input/common.ts
··· 1 - import { CONNECTIONS } from "./constants"; 2 - import type { Scheme } from "./types"; 3 - 4 - export function isSupportedScheme(scheme: string): scheme is Scheme { 5 - return !!CONNECTIONS[scheme as Scheme]; 6 - }
-5
src/scripts/configurator/input/constants.ts
··· 1 - export const CONNECTIONS = { 2 - "file+local": "/input/native-fs", 3 - opensubsonic: "/input/opensubsonic", 4 - s3: "/input/s3", 5 - };
-3
src/scripts/configurator/input/types.d.ts
··· 1 - import type { CONNECTIONS } from "./constants"; 2 - 3 - export type Scheme = keyof typeof CONNECTIONS;
+20 -47
src/scripts/configurator/input/worker.ts
··· 1 1 import * as URI from "uri-js"; 2 2 3 - import type { 4 - Consult, 5 - ConsultGrouping, 6 - GroupConsult, 7 - InputWorkerTasks, 8 - Track, 9 - } from "@applets/core/types"; 10 - import type { Scheme } from "./types"; 11 - import { CONNECTIONS } from "./constants"; 12 - import { endpoint, groupTracksPerScheme, provide, transfer } from "@scripts/common"; 13 - import { isSupportedScheme } from "./common"; 14 - import type { Remote } from "comlink"; 3 + import type { Consult, GroupConsult, InputWorkerTasks, Track } from "@applets/core/types"; 4 + import { groupTracksPerScheme, initialConnections, provide } from "@scripts/common"; 15 5 16 6 //////////////////////////////////////////// 17 - // TASKS 7 + // SETUP 18 8 //////////////////////////////////////////// 19 - const tasks = provide( 20 - { 21 - consult, 22 - contextualize, 23 - groupConsult, 24 - list, 25 - resolve, 26 - }, 27 - { 28 - manageInputWorker, 29 - }, 30 - ); 9 + const actions = { 10 + consult, 11 + contextualize, 12 + groupConsult, 13 + list, 14 + resolve, 15 + }; 16 + 17 + const { connections, tasks } = provide({ 18 + actions, 19 + connections: initialConnections<InputWorkerTasks>(["file+local", "opensubsonic", "s3"]), 20 + tasks: actions, 21 + }); 31 22 23 + export type Actions = typeof actions; 32 24 export type Tasks = typeof tasks; 33 25 34 - //////////////////////////////////////////// 35 - // CONNECTIONS 36 - //////////////////////////////////////////// 37 - const connections: Record< 38 - Scheme, 39 - { 40 - promise: Promise<Remote<InputWorkerTasks>>; 41 - resolve: (value: Remote<InputWorkerTasks>) => void; 42 - reject: () => void; 43 - } 44 - > = { 45 - "file+local": Promise.withResolvers(), 46 - opensubsonic: Promise.withResolvers(), 47 - s3: Promise.withResolvers(), 48 - }; 49 - 50 - function manageInputWorker(scheme: Scheme, port: MessagePort) { 51 - // TODO: Not sure what happens here when it's already resolved. 52 - // I guess ideally the connections record should always 53 - // provide the latest incoming value from here. 54 - connections[scheme].resolve(endpoint<InputWorkerTasks>(port)); 26 + function isSupportedScheme(scheme: string) { 27 + return !!connections[scheme]; 55 28 } 56 29 57 30 //////////////////////////////////////////// ··· 142 115 const grouped = groupTracksPerScheme( 143 116 tracks, 144 117 Object.fromEntries( 145 - Object.entries(CONNECTIONS).map(([k, _v]) => { 118 + Object.entries(connections).map(([k, _v]) => { 146 119 return [k, []]; 147 120 }), 148 121 ),
+2 -2
src/scripts/input/native-fs/worker.ts
··· 9 9 recursiveList, 10 10 trackHandleId, 11 11 } from "./common"; 12 - import { expose, provide, transfer } from "@scripts/common"; 12 + import { provide, transfer } from "@scripts/common"; 13 13 14 14 //////////////////////////////////////////// 15 15 // TASKS ··· 22 22 resolve, 23 23 }; 24 24 25 - const tasks = provide(actions, actions); 25 + const tasks = provide({ actions, tasks: actions }); 26 26 27 27 export type Actions = typeof actions; 28 28 export type Tasks = typeof tasks;
+2 -2
src/scripts/input/opensubsonic/worker.ts
··· 14 14 serverId, 15 15 serversFromTracks, 16 16 } from "./common.ts"; 17 - import { expose, provide, transfer } from "@scripts/common.ts"; 17 + import { provide, transfer } from "@scripts/common.ts"; 18 18 19 19 //////////////////////////////////////////// 20 20 // TASKS ··· 27 27 resolve, 28 28 }; 29 29 30 - const tasks = provide(actions, actions); 30 + const tasks = provide({ actions, tasks: actions }); 31 31 32 32 export type Actions = typeof actions; 33 33 export type Tasks = typeof tasks;
+2 -2
src/scripts/input/s3/worker.ts
··· 10 10 loadBuckets, 11 11 parseURI, 12 12 } from "./common"; 13 - import { expose, provide, transfer } from "@scripts/common"; 13 + import { provide, transfer } from "@scripts/common"; 14 14 import { SCHEME } from "./constants"; 15 15 16 16 //////////////////////////////////////////// ··· 24 24 resolve, 25 25 }; 26 26 27 - const tasks = provide(actions, actions); 27 + const tasks = provide({ actions, tasks: actions }); 28 28 29 29 export type Actions = typeof actions; 30 30 export type Tasks = typeof tasks;
+10 -6
src/scripts/processor/artwork/worker.ts
··· 2 2 import * as IDB from "idb-keyval"; 3 3 4 4 import type { Artwork, ArtworkRequest } from "./types"; 5 - import { expose, provide, transfer } from "@scripts/common"; 5 + import { provide } from "@scripts/common"; 6 6 import { IDB_ARTWORK_PREFIX } from "./constants"; 7 7 import { musicMetadataTags } from "../metadata/common"; 8 - import { getTransferables } from "@okikio/transferables"; 9 8 10 9 // State 11 10 let queue: ArtworkRequest[] = []; 12 11 13 12 //////////////////////////////////////////// 14 - // ACTIONS 13 + // SETUP 15 14 //////////////////////////////////////////// 16 - provide({ 15 + 16 + const actions = { 17 17 artwork, 18 18 supply, 19 - }); 19 + }; 20 20 21 - // Actions 21 + provide({ actions }); 22 + 23 + //////////////////////////////////////////// 24 + // ACTIONS 25 + //////////////////////////////////////////// 22 26 23 27 async function artwork(request: ArtworkRequest) { 24 28 const art = await processRequest(request);