A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 71250e3e9ffd1e14fcc2db839b625bb50cffcd6c 289 lines 8.0 kB view raw
1import { getTransferables } from "@okikio/transferables"; 2import { debounceMicrotask } from "@vicary/debounce-microtask"; 3import { xxh32 } from "xxh32"; 4 5import { RpcChannel } from "./worker/rpc-channel.js"; 6 7export { getTransferables } from "@okikio/transferables"; 8 9/** 10 * @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts" 11 */ 12 13// Early message buffer for regular Workers. 14// 15// If a Worker module (or its dependencies) contains a top-level `await`, the 16// browser can deliver queued incoming messages to `globalThis` while the module 17// evaluation is paused — before `ostiary`/`rpc()` has had a chance to register 18// a handler. Those messages would otherwise be silently dropped. 19// 20// This buffer captures such messages the moment this module is imported (which 21// happens before any top-level `await` pause) and replays them once `ostiary` 22// sets up the real handler. 23// 24// Detection: regular Workers are instances of DedicatedWorkerGlobalScope. 25// Previously we checked `globalThis.onmessage === null`, but Safari initialises 26// that property as `undefined` rather than `null`, causing the check to fail. 27 28/** @type {MessageEvent[]} */ 29const _earlyMessages = []; 30 31/** @type {null | (() => void)} */ 32let _flushEarlyMessages = null; 33 34if ( 35 typeof DedicatedWorkerGlobalScope !== "undefined" && 36 globalThis instanceof DedicatedWorkerGlobalScope 37) { 38 const handler = /** @type {EventListener} */ ((event) => { 39 _earlyMessages.push(/** @type {MessageEvent} */ (event)); 40 }); 41 42 globalThis.addEventListener("message", handler); 43 44 _flushEarlyMessages = () => { 45 globalThis.removeEventListener("message", handler); 46 }; 47} 48 49//////////////////////////////////////////// 50// MISC 51//////////////////////////////////////////// 52 53/** 54 * Manage incoming connections for a shared worker. 55 * If a regular worker is used instead, it'll just execute the callback immediately. 56 * 57 * @template {MessagePort | Worker | MessengerRealm} T 58 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback 59 * @param {T} [context] Uses `globalThis` by default. 60 */ 61export function ostiary( 62 callback, 63 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)), 64) { 65 if ( 66 typeof DedicatedWorkerGlobalScope !== "undefined" && 67 context instanceof DedicatedWorkerGlobalScope 68 ) { 69 callback(context, true, crypto.randomUUID()); 70 71 // Replay any messages that arrived before the handler was registered. 72 if (_flushEarlyMessages) { 73 _flushEarlyMessages(); 74 _flushEarlyMessages = null; 75 const ctx = /** @type {EventTarget} */ (/** @type {unknown} */ (context)); 76 _earlyMessages.splice(0).forEach((e) => { 77 ctx.dispatchEvent( 78 new MessageEvent("message", { data: e.data, ports: [...e.ports] }), 79 ); 80 }); 81 } 82 83 return; 84 } 85 86 const c = /** @type {any} */ (context); 87 c.__id ??= crypto.randomUUID(); 88 89 context.addEventListener( 90 "connect", 91 /** 92 * @param {any} event 93 */ 94 (event) => { 95 /** @type {MessagePort} */ 96 const port = event.ports[0]; 97 port.start(); 98 99 // Initiate setup 100 callback(port, !(c.__initiated ?? false), c.__id); 101 c.__initiated = true; 102 }, 103 ); 104} 105 106/** 107 * @param {Worker | SharedWorker} worker 108 */ 109export function workerLink(worker) { 110 if (worker instanceof SharedWorker) { 111 worker.port.start(); 112 return worker.port; 113 } else { 114 return worker; 115 } 116} 117 118/** 119 * @template {Record<string, (...args: any[]) => any>} Actions 120 * @param {() => MessagePort | Worker} workerLinkCreator 121 * @returns {ProxiedActions<Actions>} 122 */ 123export function workerProxy(workerLinkCreator) { 124 /** @type {RpcChannel<{}, Actions> | undefined} */ 125 let channel; 126 127 const proxy = new Proxy(/** @type {any} */ ({}), { 128 get: (_target, /** @type {string} */ prop) => { 129 /** @param {Parameters<Actions[any]>} args */ 130 return (...args) => { 131 channel ??= new RpcChannel(workerLinkCreator()); 132 return channel.callMethod(prop, args); 133 }; 134 }, 135 }); 136 137 return /** @type {ProxiedActions<Actions>} */ (proxy); 138} 139 140/** 141 * @param {() => MessagePort | Worker | SharedWorker} workerCreator 142 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks] 143 * @returns {Tunnel} 144 */ 145export function workerTunnel(workerCreator, hooks = {}) { 146 /** @type {MessagePort | Worker | undefined} */ 147 let link; 148 149 const channel = new MessageChannel(); 150 151 function ensureLink() { 152 if (link) return link; 153 154 const workerOrLink = workerCreator(); 155 156 link = workerOrLink instanceof SharedWorker 157 ? workerLink(workerOrLink) 158 : workerOrLink; 159 160 link.addEventListener("message", workerListener); 161 162 return link; 163 } 164 165 channel.port1.addEventListener("message", async (event) => { 166 // Send to worker 167 const { data, transfer } = await hooks?.toWorker?.(event.data) ?? 168 { data: event.data }; 169 ensureLink().postMessage(data, { transfer }); 170 }); 171 172 /** 173 * @param {Event} event 174 */ 175 const workerListener = async (event) => { 176 // Receive from worker 177 const msgEvent = /** @type {MessageEvent} */ (event); 178 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ?? 179 { data: msgEvent.data }; 180 channel.port1.postMessage(data, { transfer }); 181 }; 182 183 channel.port1.start(); 184 channel.port2.start(); 185 186 return { 187 disconnect: () => { 188 link?.removeEventListener("message", workerListener); 189 channel.port1.close(); 190 channel.port2.close(); 191 }, 192 port: channel.port2, 193 }; 194} 195 196//////////////////////////////////////////// 197// RAW 198//////////////////////////////////////////// 199 200/** 201 * @template T 202 * @param {string} name 203 * @param {T} args 204 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default. 205 */ 206export function announce( 207 name, 208 args, 209 context, 210) { 211 const a = announcement(name, args); 212 const transferables = getTransferables(a); 213 (context ?? globalThis).postMessage(a, { transfer: transferables }); 214} 215 216/** 217 * @template T 218 * @param {string} name 219 * @param {(args: T) => void} fn 220 * @param {MessagePort | Worker | MessengerRealm} [context] 221 */ 222export function listen( 223 name, 224 fn, 225 context = /** @type {MessengerRealm} */ (globalThis), 226) { 227 const c = /** @type {any} */ (context); 228 229 if (!c.__incoming) { 230 context.addEventListener("message", incomingAnnouncementsHandler(context)); 231 c.__incoming = {}; 232 } 233 234 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true }); 235} 236 237//////////////////////////////////////////// 238// RPC 239//////////////////////////////////////////// 240 241/** 242 * @template {Record<string, (...args: any[]) => any>} LocalAPI 243 * @template {Record<string, (...args: any[]) => any>} RemoteAPI 244 * @param {MessagePort | Worker | MessengerRealm} context 245 * @param {RemoteAPI} actions 246 * @returns {RpcChannel<{}, RemoteAPI>} 247 */ 248export function rpc(context, actions) { 249 /** @type {RpcChannel<{}, RemoteAPI>} */ 250 const channel = new RpcChannel(context, { expose: actions }); 251 return channel; 252} 253 254//////////////////////////////////////////// 255// ⛔️ 256//////////////////////////////////////////// 257 258const ANNOUNCEMENT = "announcement"; 259 260/** 261 * @template T 262 * @param {string} name 263 * @param {T} args 264 * @returns {Announcement<T>} 265 */ 266function announcement(name, args) { 267 return { 268 ns: ANNOUNCEMENT, 269 name, 270 key: xxh32(crypto.randomUUID()), 271 272 type: ANNOUNCEMENT, 273 args, 274 }; 275} 276 277/** 278 * @param {MessagePort | Worker | MessengerRealm} context 279 */ 280function incomingAnnouncementsHandler(context) { 281 /** @param {any} event */ 282 return (event) => { 283 const { ns, type } = event.data; 284 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return; 285 const announcement = /** @type {Announcement<any>} */ (event.data); 286 const c = /** @type {any} */ (context); 287 c.__incoming[announcement.name]?.(announcement.args); 288 }; 289}