forked from
tokono.ma/diffuse
A music player that connects to your cloud/distributed storage.
1import { getTransferables } from "@okikio/transferables";
2import { debounceMicrotask } from "@vicary/debounce-microtask";
3import { xxh32 } from "xxh32";
4
5import { RpcChannel } from "./worker/rpc-channel.js";
6
7export { getTransferables } from "@okikio/transferables";
8
9/**
10 * @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts"
11 */
12
13// Early message buffer for regular Workers.
14//
15// If a Worker module (or its dependencies) contains a top-level `await`, the
16// browser can deliver queued incoming messages to `globalThis` while the module
17// evaluation is paused — before `ostiary`/`rpc()` has had a chance to register
18// a handler. Those messages would otherwise be silently dropped.
19//
20// This buffer captures such messages the moment this module is imported (which
21// happens before any top-level `await` pause) and replays them once `ostiary`
22// sets up the real handler.
23//
24// Detection: regular Workers have `globalThis.onmessage === null`; the main
25// thread and SharedWorkers do not.
26
27/** @type {MessageEvent[]} */
28const _earlyMessages = [];
29
30/** @type {null | (() => void)} */
31let _flushEarlyMessages = null;
32
33if (/** @type {any} */ (globalThis).onmessage === null) {
34 const handler = /** @type {EventListener} */ ((event) => {
35 _earlyMessages.push(/** @type {MessageEvent} */ (event));
36 });
37
38 globalThis.addEventListener("message", handler);
39
40 _flushEarlyMessages = () => {
41 globalThis.removeEventListener("message", handler);
42 };
43}
44
45////////////////////////////////////////////
46// MISC
47////////////////////////////////////////////
48
49/**
50 * Manage incoming connections for a shared worker.
51 * If a regular worker is used instead, it'll just execute the callback immediately.
52 *
53 * @template {MessagePort | Worker | MessengerRealm} T
54 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback
55 * @param {T} [context] Uses `globalThis` by default.
56 */
57export function ostiary(
58 callback,
59 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)),
60) {
61 if (/** @type {any} */ (context).onmessage === null) {
62 callback(context, true, crypto.randomUUID());
63
64 // Replay any messages that arrived before the handler was registered.
65 if (_flushEarlyMessages) {
66 _flushEarlyMessages();
67 _flushEarlyMessages = null;
68 const ctx = /** @type {EventTarget} */ (/** @type {unknown} */ (context));
69 _earlyMessages.splice(0).forEach((e) => {
70 ctx.dispatchEvent(
71 new MessageEvent("message", { data: e.data, ports: [...e.ports] }),
72 );
73 });
74 }
75
76 return;
77 }
78
79 const c = /** @type {any} */ (context);
80 c.__id ??= crypto.randomUUID();
81
82 context.addEventListener(
83 "connect",
84 /**
85 * @param {any} event
86 */
87 (event) => {
88 /** @type {MessagePort} */
89 const port = event.ports[0];
90 port.start();
91
92 // Initiate setup
93 callback(port, !(c.__initiated ?? false), c.__id);
94 c.__initiated = true;
95 },
96 );
97}
98
99/**
100 * @param {Worker | SharedWorker} worker
101 */
102export function workerLink(worker) {
103 if (worker instanceof SharedWorker) {
104 worker.port.start();
105 return worker.port;
106 } else {
107 return worker;
108 }
109}
110
111/**
112 * @template {Record<string, (...args: any[]) => any>} Actions
113 * @param {() => MessagePort | Worker} workerLinkCreator
114 * @returns {ProxiedActions<Actions>}
115 */
116export function workerProxy(workerLinkCreator) {
117 /** @type {RpcChannel<{}, Actions> | undefined} */
118 let channel;
119
120 const proxy = new Proxy(/** @type {any} */ ({}), {
121 get: (_target, /** @type {string} */ prop) => {
122 /** @param {Parameters<Actions[any]>} args */
123 return (...args) => {
124 channel ??= new RpcChannel(workerLinkCreator());
125 return channel.callMethod(prop, args);
126 };
127 },
128 });
129
130 return /** @type {ProxiedActions<Actions>} */ (proxy);
131}
132
133/**
134 * @param {() => MessagePort | Worker | SharedWorker} workerCreator
135 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks]
136 * @returns {Tunnel}
137 */
138export function workerTunnel(workerCreator, hooks = {}) {
139 /** @type {MessagePort | Worker | undefined} */
140 let link;
141
142 const channel = new MessageChannel();
143
144 function ensureLink() {
145 if (link) return link;
146
147 const workerOrLink = workerCreator();
148
149 link = workerOrLink instanceof SharedWorker
150 ? workerLink(workerOrLink)
151 : workerOrLink;
152
153 link.addEventListener("message", workerListener);
154
155 return link;
156 }
157
158 channel.port1.addEventListener("message", async (event) => {
159 // Send to worker
160 const { data, transfer } = await hooks?.toWorker?.(event.data) ??
161 { data: event.data };
162 ensureLink().postMessage(data, { transfer });
163 });
164
165 /**
166 * @param {Event} event
167 */
168 const workerListener = async (event) => {
169 // Receive from worker
170 const msgEvent = /** @type {MessageEvent} */ (event);
171 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ??
172 { data: msgEvent.data };
173 channel.port1.postMessage(data, { transfer });
174 };
175
176 channel.port1.start();
177 channel.port2.start();
178
179 return {
180 disconnect: () => {
181 link?.removeEventListener("message", workerListener);
182 channel.port1.close();
183 channel.port2.close();
184 },
185 port: channel.port2,
186 };
187}
188
189////////////////////////////////////////////
190// RAW
191////////////////////////////////////////////
192
193/**
194 * @template T
195 * @param {string} name
196 * @param {T} args
197 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default.
198 */
199export function announce(
200 name,
201 args,
202 context,
203) {
204 const a = announcement(name, args);
205 const transferables = getTransferables(a);
206 (context ?? globalThis).postMessage(a, { transfer: transferables });
207}
208
209/**
210 * @template T
211 * @param {string} name
212 * @param {(args: T) => void} fn
213 * @param {MessagePort | Worker | MessengerRealm} [context]
214 */
215export function listen(
216 name,
217 fn,
218 context = /** @type {MessengerRealm} */ (globalThis),
219) {
220 const c = /** @type {any} */ (context);
221
222 if (!c.__incoming) {
223 context.addEventListener("message", incomingAnnouncementsHandler(context));
224 c.__incoming = {};
225 }
226
227 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true });
228}
229
230////////////////////////////////////////////
231// RPC
232////////////////////////////////////////////
233
234/**
235 * @template {Record<string, (...args: any[]) => any>} LocalAPI
236 * @template {Record<string, (...args: any[]) => any>} RemoteAPI
237 * @param {MessagePort | Worker | MessengerRealm} context
238 * @param {RemoteAPI} actions
239 * @returns {RpcChannel<{}, RemoteAPI>}
240 */
241export function rpc(context, actions) {
242 /** @type {RpcChannel<{}, RemoteAPI>} */
243 const channel = new RpcChannel(context, { expose: actions });
244 return channel;
245}
246
247////////////////////////////////////////////
248// ⛔️
249////////////////////////////////////////////
250
251const ANNOUNCEMENT = "announcement";
252
253/**
254 * @template T
255 * @param {string} name
256 * @param {T} args
257 * @returns {Announcement<T>}
258 */
259function announcement(name, args) {
260 return {
261 ns: ANNOUNCEMENT,
262 name,
263 key: xxh32(crypto.randomUUID()),
264
265 type: ANNOUNCEMENT,
266 args,
267 };
268}
269
270/**
271 * @param {MessagePort | Worker | MessengerRealm} context
272 */
273function incomingAnnouncementsHandler(context) {
274 /** @param {any} event */
275 return (event) => {
276 const { ns, type } = event.data;
277 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return;
278 const announcement = /** @type {Announcement<any>} */ (event.data);
279 const c = /** @type {any} */ (context);
280 c.__incoming[announcement.name]?.(announcement.args);
281 };
282}