forked from
tokono.ma/diffuse
A music player that connects to your cloud/distributed storage.
1/// <reference lib="webworker" />
2import { getTransferables } from "@okikio/transferables";
3import { debounceMicrotask } from "@vicary/debounce-microtask";
4import { xxh32 } from "xxh32";
5
6import { RpcChannel } from "./worker/rpc-channel.js";
7
8export { getTransferables } from "@okikio/transferables";
9
10/**
11 * @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts"
12 */
13
14// Early message buffer for regular Workers.
15//
16// If a Worker module (or its dependencies) contains a top-level `await`, the
17// browser can deliver queued incoming messages to `globalThis` while the module
18// evaluation is paused — before `ostiary`/`rpc()` has had a chance to register
19// a handler. Those messages would otherwise be silently dropped.
20//
21// This buffer captures such messages the moment this module is imported (which
22// happens before any top-level `await` pause) and replays them once `ostiary`
23// sets up the real handler.
24//
25// Detection: regular Workers are instances of DedicatedWorkerGlobalScope.
26// Previously we checked `globalThis.onmessage === null`, but Safari initialises
27// that property as `undefined` rather than `null`, causing the check to fail.
28
29/** @type {MessageEvent[]} */
30const _earlyMessages = [];
31
32/** @type {null | (() => void)} */
33let _flushEarlyMessages = null;
34
35if (
36 typeof DedicatedWorkerGlobalScope !== "undefined" &&
37 globalThis instanceof DedicatedWorkerGlobalScope
38) {
39 const handler = /** @type {EventListener} */ ((event) => {
40 _earlyMessages.push(/** @type {MessageEvent} */ (event));
41 });
42
43 globalThis.addEventListener("message", handler);
44
45 _flushEarlyMessages = () => {
46 globalThis.removeEventListener("message", handler);
47 };
48}
49
50////////////////////////////////////////////
51// MISC
52////////////////////////////////////////////
53
54/**
55 * Manage incoming connections for a shared worker.
56 * If a regular worker is used instead, it'll just execute the callback immediately.
57 *
58 * @template {MessagePort | Worker | MessengerRealm} T
59 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback
60 * @param {T} [context] Uses `globalThis` by default.
61 */
62export function ostiary(
63 callback,
64 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)),
65) {
66 if (
67 typeof DedicatedWorkerGlobalScope !== "undefined" &&
68 context instanceof DedicatedWorkerGlobalScope
69 ) {
70 callback(context, true, crypto.randomUUID());
71
72 // Replay any messages that arrived before the handler was registered.
73 if (_flushEarlyMessages) {
74 _flushEarlyMessages();
75 _flushEarlyMessages = null;
76 const ctx = /** @type {EventTarget} */ (/** @type {unknown} */ (context));
77 _earlyMessages.splice(0).forEach((e) => {
78 ctx.dispatchEvent(
79 new MessageEvent("message", { data: e.data, ports: [...e.ports] }),
80 );
81 });
82 }
83
84 return;
85 }
86
87 const c = /** @type {any} */ (context);
88 c.__id ??= crypto.randomUUID();
89
90 context.addEventListener(
91 "connect",
92 /**
93 * @param {any} event
94 */
95 (event) => {
96 /** @type {MessagePort} */
97 const port = event.ports[0];
98 port.start();
99
100 // Initiate setup
101 callback(port, !(c.__initiated ?? false), c.__id);
102 c.__initiated = true;
103 },
104 );
105}
106
107/**
108 * @param {Worker | SharedWorker} worker
109 */
110export function workerLink(worker) {
111 if (worker instanceof SharedWorker) {
112 worker.port.start();
113 return worker.port;
114 } else {
115 return worker;
116 }
117}
118
119/**
120 * @template {Record<string, (...args: any[]) => any>} Actions
121 * @param {() => MessagePort | Worker} workerLinkCreator
122 * @returns {ProxiedActions<Actions>}
123 */
124export function workerProxy(workerLinkCreator) {
125 /** @type {RpcChannel<{}, Actions> | undefined} */
126 let channel;
127
128 const proxy = new Proxy(/** @type {any} */ ({}), {
129 get: (_target, /** @type {string} */ prop) => {
130 /** @param {Parameters<Actions[any]>} args */
131 return (...args) => {
132 channel ??= new RpcChannel(workerLinkCreator());
133 return channel.callMethod(prop, args);
134 };
135 },
136 });
137
138 return /** @type {ProxiedActions<Actions>} */ (proxy);
139}
140
141/**
142 * @param {() => MessagePort | Worker | SharedWorker} workerCreator
143 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks]
144 * @returns {Tunnel}
145 */
146export function workerTunnel(workerCreator, hooks = {}) {
147 /** @type {MessagePort | Worker | undefined} */
148 let link;
149
150 const channel = new MessageChannel();
151
152 function ensureLink() {
153 if (link) return link;
154
155 const workerOrLink = workerCreator();
156
157 link = workerOrLink instanceof SharedWorker
158 ? workerLink(workerOrLink)
159 : workerOrLink;
160
161 link.addEventListener("message", workerListener);
162
163 return link;
164 }
165
166 channel.port1.addEventListener("message", async (event) => {
167 // Send to worker
168 const { data, transfer } = await hooks?.toWorker?.(event.data) ??
169 { data: event.data };
170 ensureLink().postMessage(data, { transfer });
171 });
172
173 /**
174 * @param {Event} event
175 */
176 const workerListener = async (event) => {
177 // Receive from worker
178 const msgEvent = /** @type {MessageEvent} */ (event);
179 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ??
180 { data: msgEvent.data };
181 channel.port1.postMessage(data, { transfer });
182 };
183
184 channel.port1.start();
185 channel.port2.start();
186
187 return {
188 disconnect: () => {
189 link?.removeEventListener("message", workerListener);
190 channel.port1.close();
191 channel.port2.close();
192 },
193 port: channel.port2,
194 };
195}
196
197////////////////////////////////////////////
198// RAW
199////////////////////////////////////////////
200
201/**
202 * @template T
203 * @param {string} name
204 * @param {T} args
205 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default.
206 */
207export function announce(
208 name,
209 args,
210 context,
211) {
212 const a = announcement(name, args);
213 const transferables = getTransferables(a);
214 (context ?? globalThis).postMessage(a, { transfer: transferables });
215}
216
217/**
218 * @template T
219 * @param {string} name
220 * @param {(args: T) => void} fn
221 * @param {MessagePort | Worker | MessengerRealm} [context]
222 */
223export function listen(
224 name,
225 fn,
226 context = /** @type {MessengerRealm} */ (globalThis),
227) {
228 const c = /** @type {any} */ (context);
229
230 if (!c.__incoming) {
231 context.addEventListener("message", incomingAnnouncementsHandler(context));
232 c.__incoming = {};
233 }
234
235 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true });
236}
237
238////////////////////////////////////////////
239// RPC
240////////////////////////////////////////////
241
242/**
243 * @template {Record<string, (...args: any[]) => any>} LocalAPI
244 * @template {Record<string, (...args: any[]) => any>} RemoteAPI
245 * @param {MessagePort | Worker | MessengerRealm} context
246 * @param {RemoteAPI} actions
247 * @returns {RpcChannel<{}, RemoteAPI>}
248 */
249export function rpc(context, actions) {
250 /** @type {RpcChannel<{}, RemoteAPI>} */
251 const channel = new RpcChannel(context, { expose: actions });
252 return channel;
253}
254
255////////////////////////////////////////////
256// ⛔️
257////////////////////////////////////////////
258
259const ANNOUNCEMENT = "announcement";
260
261/**
262 * @template T
263 * @param {string} name
264 * @param {T} args
265 * @returns {Announcement<T>}
266 */
267function announcement(name, args) {
268 return {
269 ns: ANNOUNCEMENT,
270 name,
271 key: xxh32(crypto.randomUUID()),
272
273 type: ANNOUNCEMENT,
274 args,
275 };
276}
277
278/**
279 * @param {MessagePort | Worker | MessengerRealm} context
280 */
281function incomingAnnouncementsHandler(context) {
282 /** @param {any} event */
283 return (event) => {
284 const { ns, type } = event.data;
285 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return;
286 const announcement = /** @type {Announcement<any>} */ (event.data);
287 const c = /** @type {any} */ (context);
288 c.__incoming[announcement.name]?.(announcement.args);
289 };
290}