A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 7a9e27652e2c620bcb60a2ead5680498c036abb0 282 lines 7.7 kB view raw
1import { getTransferables } from "@okikio/transferables"; 2import { debounceMicrotask } from "@vicary/debounce-microtask"; 3import { xxh32 } from "xxh32"; 4 5import { RpcChannel } from "./worker/rpc-channel.js"; 6 7export { getTransferables } from "@okikio/transferables"; 8 9/** 10 * @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts" 11 */ 12 13// Early message buffer for regular Workers. 14// 15// If a Worker module (or its dependencies) contains a top-level `await`, the 16// browser can deliver queued incoming messages to `globalThis` while the module 17// evaluation is paused — before `ostiary`/`rpc()` has had a chance to register 18// a handler. Those messages would otherwise be silently dropped. 19// 20// This buffer captures such messages the moment this module is imported (which 21// happens before any top-level `await` pause) and replays them once `ostiary` 22// sets up the real handler. 23// 24// Detection: regular Workers have `globalThis.onmessage === null`; the main 25// thread and SharedWorkers do not. 26 27/** @type {MessageEvent[]} */ 28const _earlyMessages = []; 29 30/** @type {null | (() => void)} */ 31let _flushEarlyMessages = null; 32 33if (/** @type {any} */ (globalThis).onmessage === null) { 34 const handler = /** @type {EventListener} */ ((event) => { 35 _earlyMessages.push(/** @type {MessageEvent} */ (event)); 36 }); 37 38 globalThis.addEventListener("message", handler); 39 40 _flushEarlyMessages = () => { 41 globalThis.removeEventListener("message", handler); 42 }; 43} 44 45//////////////////////////////////////////// 46// MISC 47//////////////////////////////////////////// 48 49/** 50 * Manage incoming connections for a shared worker. 51 * If a regular worker is used instead, it'll just execute the callback immediately. 52 * 53 * @template {MessagePort | Worker | MessengerRealm} T 54 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback 55 * @param {T} [context] Uses `globalThis` by default. 56 */ 57export function ostiary( 58 callback, 59 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)), 60) { 61 if (/** @type {any} */ (context).onmessage === null) { 62 callback(context, true, crypto.randomUUID()); 63 64 // Replay any messages that arrived before the handler was registered. 65 if (_flushEarlyMessages) { 66 _flushEarlyMessages(); 67 _flushEarlyMessages = null; 68 const ctx = /** @type {EventTarget} */ (/** @type {unknown} */ (context)); 69 _earlyMessages.splice(0).forEach((e) => { 70 ctx.dispatchEvent( 71 new MessageEvent("message", { data: e.data, ports: [...e.ports] }), 72 ); 73 }); 74 } 75 76 return; 77 } 78 79 const c = /** @type {any} */ (context); 80 c.__id ??= crypto.randomUUID(); 81 82 context.addEventListener( 83 "connect", 84 /** 85 * @param {any} event 86 */ 87 (event) => { 88 /** @type {MessagePort} */ 89 const port = event.ports[0]; 90 port.start(); 91 92 // Initiate setup 93 callback(port, !(c.__initiated ?? false), c.__id); 94 c.__initiated = true; 95 }, 96 ); 97} 98 99/** 100 * @param {Worker | SharedWorker} worker 101 */ 102export function workerLink(worker) { 103 if (worker instanceof SharedWorker) { 104 worker.port.start(); 105 return worker.port; 106 } else { 107 return worker; 108 } 109} 110 111/** 112 * @template {Record<string, (...args: any[]) => any>} Actions 113 * @param {() => MessagePort | Worker} workerLinkCreator 114 * @returns {ProxiedActions<Actions>} 115 */ 116export function workerProxy(workerLinkCreator) { 117 /** @type {RpcChannel<{}, Actions> | undefined} */ 118 let channel; 119 120 const proxy = new Proxy(/** @type {any} */ ({}), { 121 get: (_target, /** @type {string} */ prop) => { 122 /** @param {Parameters<Actions[any]>} args */ 123 return (...args) => { 124 channel ??= new RpcChannel(workerLinkCreator()); 125 return channel.callMethod(prop, args); 126 }; 127 }, 128 }); 129 130 return /** @type {ProxiedActions<Actions>} */ (proxy); 131} 132 133/** 134 * @param {() => MessagePort | Worker | SharedWorker} workerCreator 135 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks] 136 * @returns {Tunnel} 137 */ 138export function workerTunnel(workerCreator, hooks = {}) { 139 /** @type {MessagePort | Worker | undefined} */ 140 let link; 141 142 const channel = new MessageChannel(); 143 144 function ensureLink() { 145 if (link) return link; 146 147 const workerOrLink = workerCreator(); 148 149 link = workerOrLink instanceof SharedWorker 150 ? workerLink(workerOrLink) 151 : workerOrLink; 152 153 link.addEventListener("message", workerListener); 154 155 return link; 156 } 157 158 channel.port1.addEventListener("message", async (event) => { 159 // Send to worker 160 const { data, transfer } = await hooks?.toWorker?.(event.data) ?? 161 { data: event.data }; 162 ensureLink().postMessage(data, { transfer }); 163 }); 164 165 /** 166 * @param {Event} event 167 */ 168 const workerListener = async (event) => { 169 // Receive from worker 170 const msgEvent = /** @type {MessageEvent} */ (event); 171 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ?? 172 { data: msgEvent.data }; 173 channel.port1.postMessage(data, { transfer }); 174 }; 175 176 channel.port1.start(); 177 channel.port2.start(); 178 179 return { 180 disconnect: () => { 181 link?.removeEventListener("message", workerListener); 182 channel.port1.close(); 183 channel.port2.close(); 184 }, 185 port: channel.port2, 186 }; 187} 188 189//////////////////////////////////////////// 190// RAW 191//////////////////////////////////////////// 192 193/** 194 * @template T 195 * @param {string} name 196 * @param {T} args 197 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default. 198 */ 199export function announce( 200 name, 201 args, 202 context, 203) { 204 const a = announcement(name, args); 205 const transferables = getTransferables(a); 206 (context ?? globalThis).postMessage(a, { transfer: transferables }); 207} 208 209/** 210 * @template T 211 * @param {string} name 212 * @param {(args: T) => void} fn 213 * @param {MessagePort | Worker | MessengerRealm} [context] 214 */ 215export function listen( 216 name, 217 fn, 218 context = /** @type {MessengerRealm} */ (globalThis), 219) { 220 const c = /** @type {any} */ (context); 221 222 if (!c.__incoming) { 223 context.addEventListener("message", incomingAnnouncementsHandler(context)); 224 c.__incoming = {}; 225 } 226 227 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true }); 228} 229 230//////////////////////////////////////////// 231// RPC 232//////////////////////////////////////////// 233 234/** 235 * @template {Record<string, (...args: any[]) => any>} LocalAPI 236 * @template {Record<string, (...args: any[]) => any>} RemoteAPI 237 * @param {MessagePort | Worker | MessengerRealm} context 238 * @param {RemoteAPI} actions 239 * @returns {RpcChannel<{}, RemoteAPI>} 240 */ 241export function rpc(context, actions) { 242 /** @type {RpcChannel<{}, RemoteAPI>} */ 243 const channel = new RpcChannel(context, { expose: actions }); 244 return channel; 245} 246 247//////////////////////////////////////////// 248// ⛔️ 249//////////////////////////////////////////// 250 251const ANNOUNCEMENT = "announcement"; 252 253/** 254 * @template T 255 * @param {string} name 256 * @param {T} args 257 * @returns {Announcement<T>} 258 */ 259function announcement(name, args) { 260 return { 261 ns: ANNOUNCEMENT, 262 name, 263 key: xxh32(crypto.randomUUID()), 264 265 type: ANNOUNCEMENT, 266 args, 267 }; 268} 269 270/** 271 * @param {MessagePort | Worker | MessengerRealm} context 272 */ 273function incomingAnnouncementsHandler(context) { 274 /** @param {any} event */ 275 return (event) => { 276 const { ns, type } = event.data; 277 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return; 278 const announcement = /** @type {Announcement<any>} */ (event.data); 279 const c = /** @type {any} */ (context); 280 c.__incoming[announcement.name]?.(announcement.args); 281 }; 282}