Offload functions to worker threads with shared memory primitives for Node.js.
1import { Worker } from 'node:worker_threads';
2import { setupWorker, execute, dispatchStream } from './execute.ts';
3
4const workerEntryUrl = new URL(
5 import.meta.url.endsWith('.ts') ? './worker-entry.ts' : './worker-entry.js',
6 import.meta.url,
7);
8const workers = new Map<string, Worker>();
9const activeCounts = new Map<Worker, number>();
10
11function getWorker(id: string): Worker {
12 let worker = workers.get(id);
13 if (!worker) {
14 worker = new Worker(workerEntryUrl);
15 setupWorker(worker);
16 worker.unref();
17 workers.set(id, worker);
18 }
19 return worker;
20}
21
22function ref(worker: Worker): void {
23 const count = activeCounts.get(worker) ?? 0;
24 if (count === 0) worker.ref();
25 activeCounts.set(worker, count + 1);
26}
27
28function unref(worker: Worker): void {
29 const count = (activeCounts.get(worker) ?? 1) - 1;
30 activeCounts.set(worker, count);
31 if (count === 0) worker.unref();
32}
33
34export async function runOnDedicated<T>(id: string, args: unknown[]): Promise<T> {
35 const worker = getWorker(id);
36 ref(worker);
37 try {
38 return await execute<T>(worker, id, args);
39 } catch (err) {
40 if (err instanceof Error) {
41 const Ctor = err.constructor as ErrorConstructor;
42 throw new Ctor(err.message, { cause: err });
43 }
44 throw new Error(String(err), { cause: err });
45 } finally {
46 unref(worker);
47 }
48}
49
50export function runStreamOnDedicated<T>(id: string, args: unknown[]): AsyncIterable<T> {
51 const worker = getWorker(id);
52 ref(worker);
53 const { iterable, done } = dispatchStream<T>(worker, id, args);
54 done.then(
55 () => unref(worker),
56 () => unref(worker),
57 );
58 return iterable;
59}