Offload functions to worker threads with shared memory primitives for Node.js.
1import { setTimeout } from 'node:timers/promises';
2import { Worker } from 'node:worker_threads';
3import { availableParallelism } from 'node:os';
4import { setupWorker, execute, dispatchStream } from './execute.ts';
5import { AsyncIterableTask } from './stream-task.ts';
6import { roundRobin } from './balancers.ts';
7import type { ChannelOptions } from './channel.ts';
8import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts';
9
10const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url);
11
12/**
13 * Creates a pool of worker threads with configurable load balancing.
14 * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`.
15 * @param opts - Optional configuration including shutdown timeout and balancer.
16 * @returns A disposable {@link Runner} for dispatching tasks.
17 */
18export function workers(): Runner;
19export function workers(size: number, opts?: WorkerOptions): Runner;
20export function workers(opts: WorkerOptions): Runner;
21export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner {
22 let size: number;
23 if (typeof sizeOrOpts === 'object') {
24 opts = sizeOrOpts;
25 size = availableParallelism();
26 } else {
27 size = sizeOrOpts ?? availableParallelism();
28 }
29
30 const balancer: Balancer = opts?.balance ?? roundRobin();
31
32 const pool: Worker[] = [];
33 for (let i = 0; i < size; i++) {
34 const worker = new Worker(workerEntryUrl);
35 setupWorker(worker);
36 pool.push(worker);
37 }
38
39 let disposed = false;
40 const ac = new AbortController();
41 const inflight = new Set<Promise<unknown>>();
42 const activeCounts = new Map<WorkerHandle, number>();
43
44 function track<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> {
45 inflight.add(promise);
46 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1);
47 promise
48 .finally(() => {
49 inflight.delete(promise);
50 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1);
51 })
52 .catch(() => {});
53 return promise;
54 }
55
56 function terminateAll(): void {
57 for (const worker of pool) {
58 worker.terminate();
59 }
60 pool.length = 0;
61 }
62
63 function disposeBalancer(): void {
64 if (Symbol.dispose in balancer) {
65 balancer[Symbol.dispose]!();
66 } else if (Symbol.asyncDispose in balancer) {
67 (balancer[Symbol.asyncDispose]! as () => Promise<void>)();
68 }
69 }
70
71 async function asyncDisposeBalancer(): Promise<void> {
72 if (Symbol.asyncDispose in balancer) {
73 await balancer[Symbol.asyncDispose]!();
74 } else if (Symbol.dispose in balancer) {
75 balancer[Symbol.dispose]!();
76 }
77 }
78
79 function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } {
80 if (task.worker != null) {
81 const idx = workerHandles.indexOf(task.worker);
82 if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] };
83 }
84 const handle = balancer.select(workerHandles, task);
85 const idx = workerHandles.indexOf(handle);
86 return { worker: pool[idx], handle };
87 }
88
89 function dispatch<T>(task: Task): Promise<T> {
90 if (disposed) return Promise.reject(new Error('Worker pool is disposed'));
91 const { worker, handle } = resolveWorkerAndHandle(task);
92 return track(handle, execute<T>(worker, task.id, task.args));
93 }
94
95 function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle {
96 let handle: WorkerHandle;
97 handle = {
98 exec<T>(task: Task, channelOpts?: ChannelOptions): any {
99 if (task instanceof AsyncIterableTask) {
100 if (disposed) throw new Error('Worker pool is disposed');
101 const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts);
102 track(handle, done);
103 return iterable;
104 }
105 if (disposed) return Promise.reject(new Error('Worker pool is disposed'));
106 return track(handle, execute<T>(worker, task.id, task.args));
107 },
108 get thread() {
109 return worker;
110 },
111 get activeCount() {
112 return activeCounts.get(handle) ?? 0;
113 },
114 };
115 return handle;
116 }
117
118 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle));
119
120 const run: Runner = Object.assign(
121 (taskOrTasks: Task | Task[] | (Task & AsyncIterable<any>), channelOpts?: ChannelOptions): any => {
122 if (taskOrTasks instanceof AsyncIterableTask) {
123 if (disposed) throw new Error('Worker pool is disposed');
124 const { worker, handle } = resolveWorkerAndHandle(taskOrTasks);
125 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts);
126 track(handle, done);
127 return iterable;
128 }
129 if (Array.isArray(taskOrTasks)) {
130 return Promise.all(taskOrTasks.map((t) => dispatch(t)));
131 }
132 return dispatch(taskOrTasks);
133 },
134 {
135 get signal() {
136 return ac.signal;
137 },
138 get workers() {
139 return workerHandles;
140 },
141 [Symbol.dispose]() {
142 disposed = true;
143 ac.abort();
144 disposeBalancer();
145 terminateAll();
146 },
147 async [Symbol.asyncDispose]() {
148 disposed = true;
149 ac.abort();
150 const settle = Promise.allSettled(inflight);
151 if (opts?.shutdownTimeout != null) {
152 const timeoutAc = new AbortController();
153 await Promise.race([
154 settle.finally(() => timeoutAc.abort()),
155 setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}),
156 ]);
157 } else {
158 await settle;
159 }
160 await asyncDisposeBalancer();
161 terminateAll();
162 },
163 },
164 );
165
166 return run;
167}