forked from
tokono.ma/diffuse
A music player that connects to your cloud/distributed storage.
1import { getTransferables } from "@okikio/transferables";
2import { debounceMicrotask } from "@vicary/debounce-microtask";
3import { xxh32 } from "xxh32";
4
5import { RpcChannel } from "./worker/rpc-channel.js";
6
7export { getTransferables } from "@okikio/transferables";
8
9/**
10 * @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts"
11 */
12
13// Early message buffer for regular Workers.
14//
15// If a Worker module (or its dependencies) contains a top-level `await`, the
16// browser can deliver queued incoming messages to `globalThis` while the module
17// evaluation is paused — before `ostiary`/`rpc()` has had a chance to register
18// a handler. Those messages would otherwise be silently dropped.
19//
20// This buffer captures such messages the moment this module is imported (which
21// happens before any top-level `await` pause) and replays them once `ostiary`
22// sets up the real handler.
23//
24// Detection: regular Workers are instances of DedicatedWorkerGlobalScope.
25// Previously we checked `globalThis.onmessage === null`, but Safari initialises
26// that property as `undefined` rather than `null`, causing the check to fail.
27
28/** @type {MessageEvent[]} */
29const _earlyMessages = [];
30
31/** @type {null | (() => void)} */
32let _flushEarlyMessages = null;
33
34if (
35 typeof DedicatedWorkerGlobalScope !== "undefined" &&
36 globalThis instanceof DedicatedWorkerGlobalScope
37) {
38 const handler = /** @type {EventListener} */ ((event) => {
39 _earlyMessages.push(/** @type {MessageEvent} */ (event));
40 });
41
42 globalThis.addEventListener("message", handler);
43
44 _flushEarlyMessages = () => {
45 globalThis.removeEventListener("message", handler);
46 };
47}
48
49////////////////////////////////////////////
50// MISC
51////////////////////////////////////////////
52
53/**
54 * Manage incoming connections for a shared worker.
55 * If a regular worker is used instead, it'll just execute the callback immediately.
56 *
57 * @template {MessagePort | Worker | MessengerRealm} T
58 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback
59 * @param {T} [context] Uses `globalThis` by default.
60 */
61export function ostiary(
62 callback,
63 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)),
64) {
65 if (
66 typeof DedicatedWorkerGlobalScope !== "undefined" &&
67 context instanceof DedicatedWorkerGlobalScope
68 ) {
69 callback(context, true, crypto.randomUUID());
70
71 // Replay any messages that arrived before the handler was registered.
72 if (_flushEarlyMessages) {
73 _flushEarlyMessages();
74 _flushEarlyMessages = null;
75 const ctx = /** @type {EventTarget} */ (/** @type {unknown} */ (context));
76 _earlyMessages.splice(0).forEach((e) => {
77 ctx.dispatchEvent(
78 new MessageEvent("message", { data: e.data, ports: [...e.ports] }),
79 );
80 });
81 }
82
83 return;
84 }
85
86 const c = /** @type {any} */ (context);
87 c.__id ??= crypto.randomUUID();
88
89 context.addEventListener(
90 "connect",
91 /**
92 * @param {any} event
93 */
94 (event) => {
95 /** @type {MessagePort} */
96 const port = event.ports[0];
97 port.start();
98
99 // Initiate setup
100 callback(port, !(c.__initiated ?? false), c.__id);
101 c.__initiated = true;
102 },
103 );
104}
105
106/**
107 * @param {Worker | SharedWorker} worker
108 */
109export function workerLink(worker) {
110 if (worker instanceof SharedWorker) {
111 worker.port.start();
112 return worker.port;
113 } else {
114 return worker;
115 }
116}
117
118/**
119 * @template {Record<string, (...args: any[]) => any>} Actions
120 * @param {() => MessagePort | Worker} workerLinkCreator
121 * @returns {ProxiedActions<Actions>}
122 */
123export function workerProxy(workerLinkCreator) {
124 /** @type {RpcChannel<{}, Actions> | undefined} */
125 let channel;
126
127 const proxy = new Proxy(/** @type {any} */ ({}), {
128 get: (_target, /** @type {string} */ prop) => {
129 /** @param {Parameters<Actions[any]>} args */
130 return (...args) => {
131 channel ??= new RpcChannel(workerLinkCreator());
132 return channel.callMethod(prop, args);
133 };
134 },
135 });
136
137 return /** @type {ProxiedActions<Actions>} */ (proxy);
138}
139
140/**
141 * @param {() => MessagePort | Worker | SharedWorker} workerCreator
142 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks]
143 * @returns {Tunnel}
144 */
145export function workerTunnel(workerCreator, hooks = {}) {
146 /** @type {MessagePort | Worker | undefined} */
147 let link;
148
149 const channel = new MessageChannel();
150
151 function ensureLink() {
152 if (link) return link;
153
154 const workerOrLink = workerCreator();
155
156 link = workerOrLink instanceof SharedWorker
157 ? workerLink(workerOrLink)
158 : workerOrLink;
159
160 link.addEventListener("message", workerListener);
161
162 return link;
163 }
164
165 channel.port1.addEventListener("message", async (event) => {
166 // Send to worker
167 const { data, transfer } = await hooks?.toWorker?.(event.data) ??
168 { data: event.data };
169 ensureLink().postMessage(data, { transfer });
170 });
171
172 /**
173 * @param {Event} event
174 */
175 const workerListener = async (event) => {
176 // Receive from worker
177 const msgEvent = /** @type {MessageEvent} */ (event);
178 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ??
179 { data: msgEvent.data };
180 channel.port1.postMessage(data, { transfer });
181 };
182
183 channel.port1.start();
184 channel.port2.start();
185
186 return {
187 disconnect: () => {
188 link?.removeEventListener("message", workerListener);
189 channel.port1.close();
190 channel.port2.close();
191 },
192 port: channel.port2,
193 };
194}
195
196////////////////////////////////////////////
197// RAW
198////////////////////////////////////////////
199
200/**
201 * @template T
202 * @param {string} name
203 * @param {T} args
204 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default.
205 */
206export function announce(
207 name,
208 args,
209 context,
210) {
211 const a = announcement(name, args);
212 const transferables = getTransferables(a);
213 (context ?? globalThis).postMessage(a, { transfer: transferables });
214}
215
216/**
217 * @template T
218 * @param {string} name
219 * @param {(args: T) => void} fn
220 * @param {MessagePort | Worker | MessengerRealm} [context]
221 */
222export function listen(
223 name,
224 fn,
225 context = /** @type {MessengerRealm} */ (globalThis),
226) {
227 const c = /** @type {any} */ (context);
228
229 if (!c.__incoming) {
230 context.addEventListener("message", incomingAnnouncementsHandler(context));
231 c.__incoming = {};
232 }
233
234 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true });
235}
236
237////////////////////////////////////////////
238// RPC
239////////////////////////////////////////////
240
241/**
242 * @template {Record<string, (...args: any[]) => any>} LocalAPI
243 * @template {Record<string, (...args: any[]) => any>} RemoteAPI
244 * @param {MessagePort | Worker | MessengerRealm} context
245 * @param {RemoteAPI} actions
246 * @returns {RpcChannel<{}, RemoteAPI>}
247 */
248export function rpc(context, actions) {
249 /** @type {RpcChannel<{}, RemoteAPI>} */
250 const channel = new RpcChannel(context, { expose: actions });
251 return channel;
252}
253
254////////////////////////////////////////////
255// ⛔️
256////////////////////////////////////////////
257
258const ANNOUNCEMENT = "announcement";
259
260/**
261 * @template T
262 * @param {string} name
263 * @param {T} args
264 * @returns {Announcement<T>}
265 */
266function announcement(name, args) {
267 return {
268 ns: ANNOUNCEMENT,
269 name,
270 key: xxh32(crypto.randomUUID()),
271
272 type: ANNOUNCEMENT,
273 args,
274 };
275}
276
277/**
278 * @param {MessagePort | Worker | MessengerRealm} context
279 */
280function incomingAnnouncementsHandler(context) {
281 /** @param {any} event */
282 return (event) => {
283 const { ns, type } = event.data;
284 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return;
285 const announcement = /** @type {Announcement<any>} */ (event.data);
286 const c = /** @type {any} */ (context);
287 c.__incoming[announcement.name]?.(announcement.args);
288 };
289}