///
import { getTransferables } from "@okikio/transferables";
import { debounceMicrotask } from "@vicary/debounce-microtask";
import { xxh32 } from "xxh32";
import { RpcChannel } from "./worker/rpc-channel.js";
export { getTransferables } from "@okikio/transferables";
/**
* @import {Announcement, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts"
*/
// Early message buffer for regular Workers.
//
// If a Worker module (or its dependencies) contains a top-level `await`, the
// browser can deliver queued incoming messages to `globalThis` while the module
// evaluation is paused — before `ostiary`/`rpc()` has had a chance to register
// a handler. Those messages would otherwise be silently dropped.
//
// This buffer captures such messages the moment this module is imported (which
// happens before any top-level `await` pause) and replays them once `ostiary`
// sets up the real handler.
//
// Detection: regular Workers are instances of DedicatedWorkerGlobalScope.
// Previously we checked `globalThis.onmessage === null`, but Safari initialises
// that property as `undefined` rather than `null`, causing the check to fail.
/** @type {MessageEvent[]} */
const _earlyMessages = [];
/** @type {null | (() => void)} */
let _flushEarlyMessages = null;
if (
typeof DedicatedWorkerGlobalScope !== "undefined" &&
globalThis instanceof DedicatedWorkerGlobalScope
) {
const handler = /** @type {EventListener} */ ((event) => {
_earlyMessages.push(/** @type {MessageEvent} */ (event));
});
globalThis.addEventListener("message", handler);
_flushEarlyMessages = () => {
globalThis.removeEventListener("message", handler);
};
}
////////////////////////////////////////////
// MISC
////////////////////////////////////////////
/**
* Manage incoming connections for a shared worker.
* If a regular worker is used instead, it'll just execute the callback immediately.
*
* @template {MessagePort | Worker | MessengerRealm} T
* @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback
* @param {T} [context] Uses `globalThis` by default.
*/
export function ostiary(
callback,
context = /** @type {T} */ (/** @type {unknown} */ (globalThis)),
) {
if (
typeof DedicatedWorkerGlobalScope !== "undefined" &&
context instanceof DedicatedWorkerGlobalScope
) {
callback(context, true, crypto.randomUUID());
// Replay any messages that arrived before the handler was registered.
if (_flushEarlyMessages) {
_flushEarlyMessages();
_flushEarlyMessages = null;
const ctx = /** @type {EventTarget} */ (/** @type {unknown} */ (context));
_earlyMessages.splice(0).forEach((e) => {
ctx.dispatchEvent(
new MessageEvent("message", { data: e.data, ports: [...e.ports] }),
);
});
}
return;
}
const c = /** @type {any} */ (context);
c.__id ??= crypto.randomUUID();
context.addEventListener(
"connect",
/**
* @param {any} event
*/
(event) => {
/** @type {MessagePort} */
const port = event.ports[0];
port.start();
// Initiate setup
callback(port, !(c.__initiated ?? false), c.__id);
c.__initiated = true;
},
);
}
/**
* @param {Worker | SharedWorker} worker
*/
export function workerLink(worker) {
if (worker instanceof SharedWorker) {
worker.port.start();
return worker.port;
} else {
return worker;
}
}
/**
* @template {Record any>} Actions
* @param {() => MessagePort | Worker} workerLinkCreator
* @returns {ProxiedActions}
*/
export function workerProxy(workerLinkCreator) {
/** @type {RpcChannel<{}, Actions> | undefined} */
let channel;
const proxy = new Proxy(/** @type {any} */ ({}), {
get: (_target, /** @type {string} */ prop) => {
/** @param {Parameters} args */
return (...args) => {
channel ??= new RpcChannel(workerLinkCreator());
return channel.callMethod(prop, args);
};
},
});
return /** @type {ProxiedActions} */ (proxy);
}
/**
* @param {() => MessagePort | Worker | SharedWorker} workerCreator
* @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks]
* @returns {Tunnel}
*/
export function workerTunnel(workerCreator, hooks = {}) {
/** @type {MessagePort | Worker | undefined} */
let link;
const channel = new MessageChannel();
function ensureLink() {
if (link) return link;
const workerOrLink = workerCreator();
link = workerOrLink instanceof SharedWorker
? workerLink(workerOrLink)
: workerOrLink;
link.addEventListener("message", workerListener);
return link;
}
channel.port1.addEventListener("message", async (event) => {
// Send to worker
const { data, transfer } = await hooks?.toWorker?.(event.data) ??
{ data: event.data };
ensureLink().postMessage(data, { transfer });
});
/**
* @param {Event} event
*/
const workerListener = async (event) => {
// Receive from worker
const msgEvent = /** @type {MessageEvent} */ (event);
const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ??
{ data: msgEvent.data };
channel.port1.postMessage(data, { transfer });
};
channel.port1.start();
channel.port2.start();
return {
disconnect: () => {
link?.removeEventListener("message", workerListener);
channel.port1.close();
channel.port2.close();
},
port: channel.port2,
};
}
////////////////////////////////////////////
// RAW
////////////////////////////////////////////
/**
* @template T
* @param {string} name
* @param {T} args
* @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default.
*/
export function announce(
name,
args,
context,
) {
const a = announcement(name, args);
const transferables = getTransferables(a);
(context ?? globalThis).postMessage(a, { transfer: transferables });
}
/**
* @template T
* @param {string} name
* @param {(args: T) => void} fn
* @param {MessagePort | Worker | MessengerRealm} [context]
*/
export function listen(
name,
fn,
context = /** @type {MessengerRealm} */ (globalThis),
) {
const c = /** @type {any} */ (context);
if (!c.__incoming) {
context.addEventListener("message", incomingAnnouncementsHandler(context));
c.__incoming = {};
}
c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true });
}
////////////////////////////////////////////
// RPC
////////////////////////////////////////////
/**
* @template {Record any>} LocalAPI
* @template {Record any>} RemoteAPI
* @param {MessagePort | Worker | MessengerRealm} context
* @param {RemoteAPI} actions
* @returns {RpcChannel<{}, RemoteAPI>}
*/
export function rpc(context, actions) {
/** @type {RpcChannel<{}, RemoteAPI>} */
const channel = new RpcChannel(context, { expose: actions });
return channel;
}
////////////////////////////////////////////
// ⛔️
////////////////////////////////////////////
const ANNOUNCEMENT = "announcement";
/**
* @template T
* @param {string} name
* @param {T} args
* @returns {Announcement}
*/
function announcement(name, args) {
return {
ns: ANNOUNCEMENT,
name,
key: xxh32(crypto.randomUUID()),
type: ANNOUNCEMENT,
args,
};
}
/**
* @param {MessagePort | Worker | MessengerRealm} context
*/
function incomingAnnouncementsHandler(context) {
/** @param {any} event */
return (event) => {
const { ns, type } = event.data;
if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return;
const announcement = /** @type {Announcement} */ (event.data);
const c = /** @type {any} */ (context);
c.__incoming[announcement.name]?.(announcement.args);
};
}