Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e2cebd539403475e2f4e79d55ca1a84a0ce3510d 59 lines 1.6 kB view raw
1import { Worker } from 'node:worker_threads'; 2import { setupWorker, execute, dispatchStream } from './execute.ts'; 3 4const workerEntryUrl = new URL( 5 import.meta.url.endsWith('.ts') ? './worker-entry.ts' : './worker-entry.js', 6 import.meta.url, 7); 8const workers = new Map<string, Worker>(); 9const activeCounts = new Map<Worker, number>(); 10 11function getWorker(id: string): Worker { 12 let worker = workers.get(id); 13 if (!worker) { 14 worker = new Worker(workerEntryUrl); 15 setupWorker(worker); 16 worker.unref(); 17 workers.set(id, worker); 18 } 19 return worker; 20} 21 22function ref(worker: Worker): void { 23 const count = activeCounts.get(worker) ?? 0; 24 if (count === 0) worker.ref(); 25 activeCounts.set(worker, count + 1); 26} 27 28function unref(worker: Worker): void { 29 const count = (activeCounts.get(worker) ?? 1) - 1; 30 activeCounts.set(worker, count); 31 if (count === 0) worker.unref(); 32} 33 34export async function runOnDedicated<T>(id: string, args: unknown[]): Promise<T> { 35 const worker = getWorker(id); 36 ref(worker); 37 try { 38 return await execute<T>(worker, id, args); 39 } catch (err) { 40 if (err instanceof Error) { 41 const Ctor = err.constructor as ErrorConstructor; 42 throw new Ctor(err.message, { cause: err }); 43 } 44 throw new Error(String(err), { cause: err }); 45 } finally { 46 unref(worker); 47 } 48} 49 50export function runStreamOnDedicated<T>(id: string, args: unknown[]): AsyncIterable<T> { 51 const worker = getWorker(id); 52 ref(worker); 53 const { iterable, done } = dispatchStream<T>(worker, id, args); 54 done.then( 55 () => unref(worker), 56 () => unref(worker), 57 ); 58 return iterable; 59}