Offload functions to worker threads with shared memory primitives for Node.js.
1import { setTimeout } from 'node:timers/promises';
2import { Worker } from 'node:worker_threads';
3import { availableParallelism } from 'node:os';
4import { setupWorker, execute, dispatchStream } from './execute.ts';
5import { AsyncIterableTask } from './stream-task.ts';
6import { roundRobin } from './balancers.ts';
7import type { ChannelOptions } from './channel.ts';
8import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts';
9
10const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url);
11
12/**
13 * Creates a pool of worker threads with configurable load balancing.
14 * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`.
15 * @param opts - Optional configuration including shutdown timeout and balancer.
16 * @returns A disposable {@link Runner} for dispatching tasks.
17 */
18export function workers(): Runner;
19export function workers(size: number, opts?: WorkerOptions): Runner;
20export function workers(opts: WorkerOptions): Runner;
21export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner {
22 let size: number;
23 if (typeof sizeOrOpts === 'object') {
24 opts = sizeOrOpts;
25 size = availableParallelism();
26 } else {
27 size = sizeOrOpts ?? availableParallelism();
28 }
29
30 const balancer: Balancer = opts?.balance ?? roundRobin();
31
32 const pool: Worker[] = [];
33 for (let i = 0; i < size; i++) {
34 const worker = new Worker(workerEntryUrl);
35 setupWorker(worker);
36 pool.push(worker);
37 }
38
39 let disposed = false;
40 const ac = new AbortController();
41 const inflight = new Set<Promise<unknown>>();
42 const activeCounts = new Map<WorkerHandle, number>();
43
44 async function trackValue<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> {
45 inflight.add(promise);
46 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1);
47 try {
48 return await promise;
49 } catch (err) {
50 if (err instanceof Error) {
51 const Ctor = err.constructor as ErrorConstructor;
52 throw new Ctor(err.message, { cause: err });
53 }
54 throw new Error(String(err), { cause: err });
55 } finally {
56 inflight.delete(promise);
57 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1);
58 }
59 }
60
61 function trackStream(handle: WorkerHandle, done: Promise<void>): void {
62 inflight.add(done);
63 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1);
64 done.then(() => {
65 inflight.delete(done);
66 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1);
67 });
68 }
69
70 function terminateAll(): void {
71 for (const worker of pool) {
72 worker.terminate();
73 }
74 pool.length = 0;
75 }
76
77 function disposeBalancer(): void {
78 if (Symbol.dispose in balancer) {
79 balancer[Symbol.dispose]!();
80 } else if (Symbol.asyncDispose in balancer) {
81 (balancer[Symbol.asyncDispose]! as () => Promise<void>)();
82 }
83 }
84
85 async function asyncDisposeBalancer(): Promise<void> {
86 if (Symbol.asyncDispose in balancer) {
87 await balancer[Symbol.asyncDispose]!();
88 } else if (Symbol.dispose in balancer) {
89 balancer[Symbol.dispose]!();
90 }
91 }
92
93 function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } {
94 if (task.worker != null) {
95 const idx = workerHandles.indexOf(task.worker);
96 if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] };
97 }
98 const handle = balancer.select(workerHandles, task);
99 const idx = workerHandles.indexOf(handle);
100 return { worker: pool[idx], handle };
101 }
102
103 function dispatch<T>(task: Task): Promise<T> {
104 if (disposed) return Promise.reject(new Error('Worker pool is disposed'));
105 const { worker, handle } = resolveWorkerAndHandle(task);
106 return trackValue(handle, execute<T>(worker, task.id, task.args));
107 }
108
109 function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle {
110 let handle: WorkerHandle;
111 handle = {
112 exec<T>(task: Task, channelOpts?: ChannelOptions): any {
113 if (task instanceof AsyncIterableTask) {
114 if (disposed) throw new Error('Worker pool is disposed');
115 const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts);
116 trackStream(handle, done);
117 return iterable;
118 }
119 if (disposed) return Promise.reject(new Error('Worker pool is disposed'));
120 return trackValue(handle, execute<T>(worker, task.id, task.args));
121 },
122 get thread() {
123 return worker;
124 },
125 get activeCount() {
126 return activeCounts.get(handle) ?? 0;
127 },
128 };
129 return handle;
130 }
131
132 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle));
133
134 const run: Runner = Object.assign(
135 (taskOrTasks: Task | Task[] | (Task & AsyncIterable<any>), channelOpts?: ChannelOptions): any => {
136 if (taskOrTasks instanceof AsyncIterableTask) {
137 if (disposed) throw new Error('Worker pool is disposed');
138 const { worker, handle } = resolveWorkerAndHandle(taskOrTasks);
139 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts);
140 trackStream(handle, done);
141 return iterable;
142 }
143 if (Array.isArray(taskOrTasks)) {
144 return Promise.all(taskOrTasks.map((t) => dispatch(t)));
145 }
146 return dispatch(taskOrTasks);
147 },
148 {
149 get signal() {
150 return ac.signal;
151 },
152 get workers() {
153 return workerHandles;
154 },
155 [Symbol.dispose]() {
156 disposed = true;
157 ac.abort();
158 disposeBalancer();
159 terminateAll();
160 },
161 async [Symbol.asyncDispose]() {
162 disposed = true;
163 ac.abort();
164 const settle = Promise.allSettled(inflight);
165 if (opts?.shutdownTimeout != null) {
166 const timeoutAc = new AbortController();
167 await Promise.race([
168 settle.finally(() => timeoutAc.abort()),
169 setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}),
170 ]);
171 } else {
172 await settle;
173 }
174 await asyncDisposeBalancer();
175 terminateAll();
176 },
177 },
178 );
179
180 return run;
181}