Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e46aca05dbf9a2204886d23f7ae3b34883f4e5c6 167 lines 5.7 kB view raw
1import { setTimeout } from 'node:timers/promises'; 2import { Worker } from 'node:worker_threads'; 3import { availableParallelism } from 'node:os'; 4import { setupWorker, execute, dispatchStream } from './execute.ts'; 5import { AsyncIterableTask } from './stream-task.ts'; 6import { roundRobin } from './balancers.ts'; 7import type { ChannelOptions } from './channel.ts'; 8import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 9 10const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 11 12/** 13 * Creates a pool of worker threads with configurable load balancing. 14 * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`. 15 * @param opts - Optional configuration including shutdown timeout and balancer. 16 * @returns A disposable {@link Runner} for dispatching tasks. 17 */ 18export function workers(): Runner; 19export function workers(size: number, opts?: WorkerOptions): Runner; 20export function workers(opts: WorkerOptions): Runner; 21export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner { 22 let size: number; 23 if (typeof sizeOrOpts === 'object') { 24 opts = sizeOrOpts; 25 size = availableParallelism(); 26 } else { 27 size = sizeOrOpts ?? availableParallelism(); 28 } 29 30 const balancer: Balancer = opts?.balance ?? roundRobin(); 31 32 const pool: Worker[] = []; 33 for (let i = 0; i < size; i++) { 34 const worker = new Worker(workerEntryUrl); 35 setupWorker(worker); 36 pool.push(worker); 37 } 38 39 let disposed = false; 40 const ac = new AbortController(); 41 const inflight = new Set<Promise<unknown>>(); 42 const activeCounts = new Map<WorkerHandle, number>(); 43 44 function track<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> { 45 inflight.add(promise); 46 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 47 promise 48 .finally(() => { 49 inflight.delete(promise); 50 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 51 }) 52 .catch(() => {}); 53 return promise; 54 } 55 56 function terminateAll(): void { 57 for (const worker of pool) { 58 worker.terminate(); 59 } 60 pool.length = 0; 61 } 62 63 function disposeBalancer(): void { 64 if (Symbol.dispose in balancer) { 65 balancer[Symbol.dispose]!(); 66 } else if (Symbol.asyncDispose in balancer) { 67 (balancer[Symbol.asyncDispose]! as () => Promise<void>)(); 68 } 69 } 70 71 async function asyncDisposeBalancer(): Promise<void> { 72 if (Symbol.asyncDispose in balancer) { 73 await balancer[Symbol.asyncDispose]!(); 74 } else if (Symbol.dispose in balancer) { 75 balancer[Symbol.dispose]!(); 76 } 77 } 78 79 function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } { 80 if (task.worker != null) { 81 const idx = workerHandles.indexOf(task.worker); 82 if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] }; 83 } 84 const handle = balancer.select(workerHandles, task); 85 const idx = workerHandles.indexOf(handle); 86 return { worker: pool[idx], handle }; 87 } 88 89 function dispatch<T>(task: Task): Promise<T> { 90 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 91 const { worker, handle } = resolveWorkerAndHandle(task); 92 return track(handle, execute<T>(worker, task.id, task.args)); 93 } 94 95 function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle { 96 let handle: WorkerHandle; 97 handle = { 98 exec<T>(task: Task, channelOpts?: ChannelOptions): any { 99 if (task instanceof AsyncIterableTask) { 100 if (disposed) throw new Error('Worker pool is disposed'); 101 const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts); 102 track(handle, done); 103 return iterable; 104 } 105 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 106 return track(handle, execute<T>(worker, task.id, task.args)); 107 }, 108 get thread() { 109 return worker; 110 }, 111 get activeCount() { 112 return activeCounts.get(handle) ?? 0; 113 }, 114 }; 115 return handle; 116 } 117 118 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); 119 120 const run: Runner = Object.assign( 121 (taskOrTasks: Task | Task[] | (Task & AsyncIterable<any>), channelOpts?: ChannelOptions): any => { 122 if (taskOrTasks instanceof AsyncIterableTask) { 123 if (disposed) throw new Error('Worker pool is disposed'); 124 const { worker, handle } = resolveWorkerAndHandle(taskOrTasks); 125 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 126 track(handle, done); 127 return iterable; 128 } 129 if (Array.isArray(taskOrTasks)) { 130 return Promise.all(taskOrTasks.map((t) => dispatch(t))); 131 } 132 return dispatch(taskOrTasks); 133 }, 134 { 135 get signal() { 136 return ac.signal; 137 }, 138 get workers() { 139 return workerHandles; 140 }, 141 [Symbol.dispose]() { 142 disposed = true; 143 ac.abort(); 144 disposeBalancer(); 145 terminateAll(); 146 }, 147 async [Symbol.asyncDispose]() { 148 disposed = true; 149 ac.abort(); 150 const settle = Promise.allSettled(inflight); 151 if (opts?.shutdownTimeout != null) { 152 const timeoutAc = new AbortController(); 153 await Promise.race([ 154 settle.finally(() => timeoutAc.abort()), 155 setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}), 156 ]); 157 } else { 158 await settle; 159 } 160 await asyncDisposeBalancer(); 161 terminateAll(); 162 }, 163 }, 164 ); 165 166 return run; 167}