import { setTimeout } from 'node:timers/promises'; import { Worker } from 'node:worker_threads'; import { availableParallelism } from 'node:os'; import { setupWorker, execute, dispatchStream } from './execute.ts'; import { AsyncIterableTask } from './stream-task.ts'; import { roundRobin } from './balancers.ts'; import type { ChannelOptions } from './channel.ts'; import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; const workerEntryUrl = new URL( import.meta.url.endsWith('.ts') ? './worker-entry.ts' : './worker-entry.js', import.meta.url, ); /** * Creates a pool of worker threads with configurable load balancing. * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`. * @param opts - Optional configuration including shutdown timeout and balancer. * @returns A disposable {@link Runner} for dispatching tasks. */ export function workers(): Runner; export function workers(size: number, opts?: WorkerOptions): Runner; export function workers(opts: WorkerOptions): Runner; export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner { let size: number; if (typeof sizeOrOpts === 'object') { opts = sizeOrOpts; size = availableParallelism(); } else { size = sizeOrOpts ?? availableParallelism(); } const balancer: Balancer = opts?.balance ?? roundRobin(); const pool: Worker[] = []; for (let i = 0; i < size; i++) { const worker = new Worker(workerEntryUrl); setupWorker(worker); pool.push(worker); } let disposed = false; const ac = new AbortController(); const inflight = new Set>(); const activeCounts = new Map(); async function trackValue(handle: WorkerHandle, promise: Promise): Promise { inflight.add(promise); activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); try { return await promise; } catch (err) { if (err instanceof Error) { const Ctor = err.constructor as ErrorConstructor; throw new Ctor(err.message, { cause: err }); } throw new Error(String(err), { cause: err }); } finally { inflight.delete(promise); activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); } } function trackStream(handle: WorkerHandle, done: Promise): void { inflight.add(done); activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); done.then(() => { inflight.delete(done); activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); }); } function terminateAll(): void { for (const worker of pool) { worker.terminate(); } pool.length = 0; } function disposeBalancer(): void { if (Symbol.dispose in balancer) { balancer[Symbol.dispose]!(); } else if (Symbol.asyncDispose in balancer) { (balancer[Symbol.asyncDispose]! as () => Promise)(); } } async function asyncDisposeBalancer(): Promise { if (Symbol.asyncDispose in balancer) { await balancer[Symbol.asyncDispose]!(); } else if (Symbol.dispose in balancer) { balancer[Symbol.dispose]!(); } } function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } { if (task.worker != null) { const idx = workerHandles.indexOf(task.worker); if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] }; } const handle = balancer.select(workerHandles, task); const idx = workerHandles.indexOf(handle); return { worker: pool[idx], handle }; } function dispatch(task: Task): Promise { if (disposed) return Promise.reject(new Error('Worker pool is disposed')); const { worker, handle } = resolveWorkerAndHandle(task); return trackValue(handle, execute(worker, task.id, task.args)); } function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle { let handle: WorkerHandle; handle = { exec(task: Task, channelOpts?: ChannelOptions): any { if (task instanceof AsyncIterableTask) { if (disposed) throw new Error('Worker pool is disposed'); const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts); trackStream(handle, done); return iterable; } if (disposed) return Promise.reject(new Error('Worker pool is disposed')); return trackValue(handle, execute(worker, task.id, task.args)); }, get thread() { return worker; }, get activeCount() { return activeCounts.get(handle) ?? 0; }, }; return handle; } const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); const run: Runner = Object.assign( (taskOrTasks: Task | Task[] | (Task & AsyncIterable), channelOpts?: ChannelOptions): any => { if (taskOrTasks instanceof AsyncIterableTask) { if (disposed) throw new Error('Worker pool is disposed'); const { worker, handle } = resolveWorkerAndHandle(taskOrTasks); const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); trackStream(handle, done); return iterable; } if (Array.isArray(taskOrTasks)) { return Promise.all(taskOrTasks.map((t) => dispatch(t))); } return dispatch(taskOrTasks); }, { get signal() { return ac.signal; }, get workers() { return workerHandles; }, [Symbol.dispose]() { disposed = true; ac.abort(); disposeBalancer(); terminateAll(); }, async [Symbol.asyncDispose]() { disposed = true; ac.abort(); const settle = Promise.allSettled(inflight); if (opts?.shutdownTimeout != null) { const timeoutAc = new AbortController(); await Promise.race([ settle.finally(() => timeoutAc.abort()), setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}), ]); } else { await settle; } await asyncDisposeBalancer(); terminateAll(); }, }, ); return run; }