Offload functions to worker threads with shared memory primitives for Node.js.
1import { setTimeout } from 'node:timers/promises';
2import { Worker } from 'node:worker_threads';
3import { availableParallelism } from 'node:os';
4import { setupWorker, execute, dispatchStream } from './execute.ts';
5import { AsyncIterableTask } from './stream-task.ts';
6import { roundRobin } from './balancers.ts';
7import type { ChannelOptions } from './channel.ts';
8import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts';
9
10const workerEntryUrl = new URL(
11 import.meta.url.endsWith('.ts') ? './worker-entry.ts' : './worker-entry.js',
12 import.meta.url,
13);
14
15/**
16 * Creates a pool of worker threads with configurable load balancing.
17 * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`.
18 * @param opts - Optional configuration including shutdown timeout and balancer.
19 * @returns A disposable {@link Runner} for dispatching tasks.
20 */
21export function workers(): Runner;
22export function workers(size: number, opts?: WorkerOptions): Runner;
23export function workers(opts: WorkerOptions): Runner;
24export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner {
25 let size: number;
26 if (typeof sizeOrOpts === 'object') {
27 opts = sizeOrOpts;
28 size = availableParallelism();
29 } else {
30 size = sizeOrOpts ?? availableParallelism();
31 }
32
33 const balancer: Balancer = opts?.balance ?? roundRobin();
34
35 const pool: Worker[] = [];
36 for (let i = 0; i < size; i++) {
37 const worker = new Worker(workerEntryUrl);
38 setupWorker(worker);
39 pool.push(worker);
40 }
41
42 let disposed = false;
43 const ac = new AbortController();
44 const inflight = new Set<Promise<unknown>>();
45 const activeCounts = new Map<WorkerHandle, number>();
46
47 async function trackValue<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> {
48 inflight.add(promise);
49 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1);
50 try {
51 return await promise;
52 } catch (err) {
53 if (err instanceof Error) {
54 const Ctor = err.constructor as ErrorConstructor;
55 throw new Ctor(err.message, { cause: err });
56 }
57 throw new Error(String(err), { cause: err });
58 } finally {
59 inflight.delete(promise);
60 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1);
61 }
62 }
63
64 function trackStream(handle: WorkerHandle, done: Promise<void>): void {
65 inflight.add(done);
66 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1);
67 done.then(() => {
68 inflight.delete(done);
69 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1);
70 });
71 }
72
73 function terminateAll(): void {
74 for (const worker of pool) {
75 worker.terminate();
76 }
77 pool.length = 0;
78 }
79
80 function disposeBalancer(): void {
81 if (Symbol.dispose in balancer) {
82 balancer[Symbol.dispose]!();
83 } else if (Symbol.asyncDispose in balancer) {
84 (balancer[Symbol.asyncDispose]! as () => Promise<void>)();
85 }
86 }
87
88 async function asyncDisposeBalancer(): Promise<void> {
89 if (Symbol.asyncDispose in balancer) {
90 await balancer[Symbol.asyncDispose]!();
91 } else if (Symbol.dispose in balancer) {
92 balancer[Symbol.dispose]!();
93 }
94 }
95
96 function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } {
97 if (task.worker != null) {
98 const idx = workerHandles.indexOf(task.worker);
99 if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] };
100 }
101 const handle = balancer.select(workerHandles, task);
102 const idx = workerHandles.indexOf(handle);
103 return { worker: pool[idx], handle };
104 }
105
106 function dispatch<T>(task: Task): Promise<T> {
107 if (disposed) return Promise.reject(new Error('Worker pool is disposed'));
108 const { worker, handle } = resolveWorkerAndHandle(task);
109 return trackValue(handle, execute<T>(worker, task.id, task.args));
110 }
111
112 function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle {
113 let handle: WorkerHandle;
114 handle = {
115 exec<T>(task: Task, channelOpts?: ChannelOptions): any {
116 if (task instanceof AsyncIterableTask) {
117 if (disposed) throw new Error('Worker pool is disposed');
118 const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts);
119 trackStream(handle, done);
120 return iterable;
121 }
122 if (disposed) return Promise.reject(new Error('Worker pool is disposed'));
123 return trackValue(handle, execute<T>(worker, task.id, task.args));
124 },
125 get thread() {
126 return worker;
127 },
128 get activeCount() {
129 return activeCounts.get(handle) ?? 0;
130 },
131 };
132 return handle;
133 }
134
135 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle));
136
137 const run: Runner = Object.assign(
138 (taskOrTasks: Task | Task[] | (Task & AsyncIterable<any>), channelOpts?: ChannelOptions): any => {
139 if (taskOrTasks instanceof AsyncIterableTask) {
140 if (disposed) throw new Error('Worker pool is disposed');
141 const { worker, handle } = resolveWorkerAndHandle(taskOrTasks);
142 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts);
143 trackStream(handle, done);
144 return iterable;
145 }
146 if (Array.isArray(taskOrTasks)) {
147 return Promise.all(taskOrTasks.map((t) => dispatch(t)));
148 }
149 return dispatch(taskOrTasks);
150 },
151 {
152 get signal() {
153 return ac.signal;
154 },
155 get workers() {
156 return workerHandles;
157 },
158 [Symbol.dispose]() {
159 disposed = true;
160 ac.abort();
161 disposeBalancer();
162 terminateAll();
163 },
164 async [Symbol.asyncDispose]() {
165 disposed = true;
166 ac.abort();
167 const settle = Promise.allSettled(inflight);
168 if (opts?.shutdownTimeout != null) {
169 const timeoutAc = new AbortController();
170 await Promise.race([
171 settle.finally(() => timeoutAc.abort()),
172 setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}),
173 ]);
174 } else {
175 await settle;
176 }
177 await asyncDisposeBalancer();
178 terminateAll();
179 },
180 },
181 );
182
183 return run;
184}