Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at c659a33e27184efe98edaebcf77844b683fa887d 181 lines 6.2 kB view raw
1import { setTimeout } from 'node:timers/promises'; 2import { Worker } from 'node:worker_threads'; 3import { availableParallelism } from 'node:os'; 4import { setupWorker, execute, dispatchStream } from './execute.ts'; 5import { AsyncIterableTask } from './stream-task.ts'; 6import { roundRobin } from './balancers.ts'; 7import type { ChannelOptions } from './channel.ts'; 8import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 9 10const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 11 12/** 13 * Creates a pool of worker threads with configurable load balancing. 14 * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`. 15 * @param opts - Optional configuration including shutdown timeout and balancer. 16 * @returns A disposable {@link Runner} for dispatching tasks. 17 */ 18export function workers(): Runner; 19export function workers(size: number, opts?: WorkerOptions): Runner; 20export function workers(opts: WorkerOptions): Runner; 21export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner { 22 let size: number; 23 if (typeof sizeOrOpts === 'object') { 24 opts = sizeOrOpts; 25 size = availableParallelism(); 26 } else { 27 size = sizeOrOpts ?? availableParallelism(); 28 } 29 30 const balancer: Balancer = opts?.balance ?? roundRobin(); 31 32 const pool: Worker[] = []; 33 for (let i = 0; i < size; i++) { 34 const worker = new Worker(workerEntryUrl); 35 setupWorker(worker); 36 pool.push(worker); 37 } 38 39 let disposed = false; 40 const ac = new AbortController(); 41 const inflight = new Set<Promise<unknown>>(); 42 const activeCounts = new Map<WorkerHandle, number>(); 43 44 async function trackValue<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> { 45 inflight.add(promise); 46 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 47 try { 48 return await promise; 49 } catch (err) { 50 if (err instanceof Error) { 51 const Ctor = err.constructor as ErrorConstructor; 52 throw new Ctor(err.message, { cause: err }); 53 } 54 throw new Error(String(err), { cause: err }); 55 } finally { 56 inflight.delete(promise); 57 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 58 } 59 } 60 61 function trackStream(handle: WorkerHandle, done: Promise<void>): void { 62 inflight.add(done); 63 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 64 done.then(() => { 65 inflight.delete(done); 66 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 67 }); 68 } 69 70 function terminateAll(): void { 71 for (const worker of pool) { 72 worker.terminate(); 73 } 74 pool.length = 0; 75 } 76 77 function disposeBalancer(): void { 78 if (Symbol.dispose in balancer) { 79 balancer[Symbol.dispose]!(); 80 } else if (Symbol.asyncDispose in balancer) { 81 (balancer[Symbol.asyncDispose]! as () => Promise<void>)(); 82 } 83 } 84 85 async function asyncDisposeBalancer(): Promise<void> { 86 if (Symbol.asyncDispose in balancer) { 87 await balancer[Symbol.asyncDispose]!(); 88 } else if (Symbol.dispose in balancer) { 89 balancer[Symbol.dispose]!(); 90 } 91 } 92 93 function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } { 94 if (task.worker != null) { 95 const idx = workerHandles.indexOf(task.worker); 96 if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] }; 97 } 98 const handle = balancer.select(workerHandles, task); 99 const idx = workerHandles.indexOf(handle); 100 return { worker: pool[idx], handle }; 101 } 102 103 function dispatch<T>(task: Task): Promise<T> { 104 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 105 const { worker, handle } = resolveWorkerAndHandle(task); 106 return trackValue(handle, execute<T>(worker, task.id, task.args)); 107 } 108 109 function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle { 110 let handle: WorkerHandle; 111 handle = { 112 exec<T>(task: Task, channelOpts?: ChannelOptions): any { 113 if (task instanceof AsyncIterableTask) { 114 if (disposed) throw new Error('Worker pool is disposed'); 115 const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts); 116 trackStream(handle, done); 117 return iterable; 118 } 119 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 120 return trackValue(handle, execute<T>(worker, task.id, task.args)); 121 }, 122 get thread() { 123 return worker; 124 }, 125 get activeCount() { 126 return activeCounts.get(handle) ?? 0; 127 }, 128 }; 129 return handle; 130 } 131 132 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); 133 134 const run: Runner = Object.assign( 135 (taskOrTasks: Task | Task[] | (Task & AsyncIterable<any>), channelOpts?: ChannelOptions): any => { 136 if (taskOrTasks instanceof AsyncIterableTask) { 137 if (disposed) throw new Error('Worker pool is disposed'); 138 const { worker, handle } = resolveWorkerAndHandle(taskOrTasks); 139 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 140 trackStream(handle, done); 141 return iterable; 142 } 143 if (Array.isArray(taskOrTasks)) { 144 return Promise.all(taskOrTasks.map((t) => dispatch(t))); 145 } 146 return dispatch(taskOrTasks); 147 }, 148 { 149 get signal() { 150 return ac.signal; 151 }, 152 get workers() { 153 return workerHandles; 154 }, 155 [Symbol.dispose]() { 156 disposed = true; 157 ac.abort(); 158 disposeBalancer(); 159 terminateAll(); 160 }, 161 async [Symbol.asyncDispose]() { 162 disposed = true; 163 ac.abort(); 164 const settle = Promise.allSettled(inflight); 165 if (opts?.shutdownTimeout != null) { 166 const timeoutAc = new AbortController(); 167 await Promise.race([ 168 settle.finally(() => timeoutAc.abort()), 169 setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}), 170 ]); 171 } else { 172 await settle; 173 } 174 await asyncDisposeBalancer(); 175 terminateAll(); 176 }, 177 }, 178 ); 179 180 return run; 181}