Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 184 lines 6.3 kB view raw
1import { setTimeout } from 'node:timers/promises'; 2import { Worker } from 'node:worker_threads'; 3import { availableParallelism } from 'node:os'; 4import { setupWorker, execute, dispatchStream } from './execute.ts'; 5import { AsyncIterableTask } from './stream-task.ts'; 6import { roundRobin } from './balancers.ts'; 7import type { ChannelOptions } from './channel.ts'; 8import type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 9 10const workerEntryUrl = new URL( 11 import.meta.url.endsWith('.ts') ? './worker-entry.ts' : './worker-entry.js', 12 import.meta.url, 13); 14 15/** 16 * Creates a pool of worker threads with configurable load balancing. 17 * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`. 18 * @param opts - Optional configuration including shutdown timeout and balancer. 19 * @returns A disposable {@link Runner} for dispatching tasks. 20 */ 21export function workers(): Runner; 22export function workers(size: number, opts?: WorkerOptions): Runner; 23export function workers(opts: WorkerOptions): Runner; 24export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner { 25 let size: number; 26 if (typeof sizeOrOpts === 'object') { 27 opts = sizeOrOpts; 28 size = availableParallelism(); 29 } else { 30 size = sizeOrOpts ?? availableParallelism(); 31 } 32 33 const balancer: Balancer = opts?.balance ?? roundRobin(); 34 35 const pool: Worker[] = []; 36 for (let i = 0; i < size; i++) { 37 const worker = new Worker(workerEntryUrl); 38 setupWorker(worker); 39 pool.push(worker); 40 } 41 42 let disposed = false; 43 const ac = new AbortController(); 44 const inflight = new Set<Promise<unknown>>(); 45 const activeCounts = new Map<WorkerHandle, number>(); 46 47 async function trackValue<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> { 48 inflight.add(promise); 49 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 50 try { 51 return await promise; 52 } catch (err) { 53 if (err instanceof Error) { 54 const Ctor = err.constructor as ErrorConstructor; 55 throw new Ctor(err.message, { cause: err }); 56 } 57 throw new Error(String(err), { cause: err }); 58 } finally { 59 inflight.delete(promise); 60 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 61 } 62 } 63 64 function trackStream(handle: WorkerHandle, done: Promise<void>): void { 65 inflight.add(done); 66 activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 67 done.then(() => { 68 inflight.delete(done); 69 activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 70 }); 71 } 72 73 function terminateAll(): void { 74 for (const worker of pool) { 75 worker.terminate(); 76 } 77 pool.length = 0; 78 } 79 80 function disposeBalancer(): void { 81 if (Symbol.dispose in balancer) { 82 balancer[Symbol.dispose]!(); 83 } else if (Symbol.asyncDispose in balancer) { 84 (balancer[Symbol.asyncDispose]! as () => Promise<void>)(); 85 } 86 } 87 88 async function asyncDisposeBalancer(): Promise<void> { 89 if (Symbol.asyncDispose in balancer) { 90 await balancer[Symbol.asyncDispose]!(); 91 } else if (Symbol.dispose in balancer) { 92 balancer[Symbol.dispose]!(); 93 } 94 } 95 96 function resolveWorkerAndHandle(task: Task): { worker: Worker; handle: WorkerHandle } { 97 if (task.worker != null) { 98 const idx = workerHandles.indexOf(task.worker); 99 if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] }; 100 } 101 const handle = balancer.select(workerHandles, task); 102 const idx = workerHandles.indexOf(handle); 103 return { worker: pool[idx], handle }; 104 } 105 106 function dispatch<T>(task: Task): Promise<T> { 107 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 108 const { worker, handle } = resolveWorkerAndHandle(task); 109 return trackValue(handle, execute<T>(worker, task.id, task.args)); 110 } 111 112 function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle { 113 let handle: WorkerHandle; 114 handle = { 115 exec<T>(task: Task, channelOpts?: ChannelOptions): any { 116 if (task instanceof AsyncIterableTask) { 117 if (disposed) throw new Error('Worker pool is disposed'); 118 const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts); 119 trackStream(handle, done); 120 return iterable; 121 } 122 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 123 return trackValue(handle, execute<T>(worker, task.id, task.args)); 124 }, 125 get thread() { 126 return worker; 127 }, 128 get activeCount() { 129 return activeCounts.get(handle) ?? 0; 130 }, 131 }; 132 return handle; 133 } 134 135 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); 136 137 const run: Runner = Object.assign( 138 (taskOrTasks: Task | Task[] | (Task & AsyncIterable<any>), channelOpts?: ChannelOptions): any => { 139 if (taskOrTasks instanceof AsyncIterableTask) { 140 if (disposed) throw new Error('Worker pool is disposed'); 141 const { worker, handle } = resolveWorkerAndHandle(taskOrTasks); 142 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 143 trackStream(handle, done); 144 return iterable; 145 } 146 if (Array.isArray(taskOrTasks)) { 147 return Promise.all(taskOrTasks.map((t) => dispatch(t))); 148 } 149 return dispatch(taskOrTasks); 150 }, 151 { 152 get signal() { 153 return ac.signal; 154 }, 155 get workers() { 156 return workerHandles; 157 }, 158 [Symbol.dispose]() { 159 disposed = true; 160 ac.abort(); 161 disposeBalancer(); 162 terminateAll(); 163 }, 164 async [Symbol.asyncDispose]() { 165 disposed = true; 166 ac.abort(); 167 const settle = Promise.allSettled(inflight); 168 if (opts?.shutdownTimeout != null) { 169 const timeoutAc = new AbortController(); 170 await Promise.race([ 171 settle.finally(() => timeoutAc.abort()), 172 setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}), 173 ]); 174 } else { 175 await settle; 176 } 177 await asyncDisposeBalancer(); 178 terminateAll(); 179 }, 180 }, 181 ); 182 183 return run; 184}