Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: async dispose with signal, in-flight tracking, and shutdown timeout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+91 -16
+34 -16
src/worker-pool.ts
··· 4 4 import type { Task } from './task.ts'; 5 5 import { StreamTask } from './stream-task.ts'; 6 6 import type { ChannelOptions } from './channel.ts'; 7 - import type { Runner } from './runner.ts'; 7 + import type { Runner, WorkerOptions } from './runner.ts'; 8 8 9 9 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 10 10 11 11 /** 12 12 * Creates a pool of worker threads that dispatch tasks with round-robin scheduling. 13 13 * @param size - Number of worker threads in the pool. Defaults to the number of available CPUs. 14 + * @param opts - Optional configuration including shutdown timeout. 14 15 * @returns A disposable {@link Runner} for dispatching tasks. 15 16 */ 16 - export function workers(size: number = availableParallelism()): Runner { 17 + export function workers(size: number = availableParallelism(), opts?: WorkerOptions): Runner { 17 18 const pool: Worker[] = []; 18 19 for (let i = 0; i < size; i++) { 19 20 const worker = new Worker(workerEntryUrl); ··· 24 25 25 26 let next = 0; 26 27 let disposed = false; 28 + const ac = new AbortController(); 29 + const inflight = new Set<Promise<unknown>>(); 30 + 31 + function track<T>(promise: Promise<T>): Promise<T> { 32 + inflight.add(promise); 33 + promise.finally(() => inflight.delete(promise)); 34 + return promise; 35 + } 27 36 28 - const abortController = new AbortController(); 37 + function terminateAll(): void { 38 + for (const worker of pool) { 39 + worker.terminate(); 40 + } 41 + pool.length = 0; 42 + } 29 43 30 44 function dispatch<T>(task: Task<T>): Promise<T> { 31 45 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 32 46 const worker = pool[next % pool.length]; 33 47 next++; 34 - return execute<T>(worker, task.id, task.args); 48 + return track(execute<T>(worker, task.id, task.args)); 35 49 } 36 50 37 51 const run: Runner = Object.assign( 38 - (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, opts?: ChannelOptions): any => { 52 + (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 39 53 if (taskOrTasks instanceof StreamTask) { 40 54 if (disposed) throw new Error('Worker pool is disposed'); 41 55 const worker = pool[next % pool.length]; 42 56 next++; 43 - const { iterable } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, opts); 57 + const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 58 + track(done); 44 59 return iterable; 45 60 } 46 61 if (Array.isArray(taskOrTasks)) { ··· 50 65 }, 51 66 { 52 67 get signal() { 53 - return abortController.signal; 68 + return ac.signal; 54 69 }, 55 70 [Symbol.dispose]() { 56 71 disposed = true; 57 - abortController.abort(); 58 - for (const worker of pool) { 59 - worker.terminate(); 60 - } 61 - pool.length = 0; 72 + ac.abort(); 73 + terminateAll(); 62 74 }, 63 75 async [Symbol.asyncDispose]() { 64 - abortController.abort(); 65 76 disposed = true; 66 - for (const worker of pool) { 67 - worker.terminate(); 77 + ac.abort(); 78 + const settle = Promise.allSettled(inflight); 79 + if (opts?.shutdownTimeout != null) { 80 + await Promise.race([ 81 + settle, 82 + new Promise<void>((r) => setTimeout(r, opts.shutdownTimeout)), 83 + ]); 84 + } else { 85 + await settle; 68 86 } 69 - pool.length = 0; 87 + terminateAll(); 70 88 }, 71 89 }, 72 90 );
+44
test/async-dispose.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers } from 'moroutine'; 4 + import { slowTask, waitForAbort } from './fixtures/async-dispose.ts'; 5 + 6 + describe('async dispose', () => { 7 + it('waits for in-flight tasks to complete', async () => { 8 + const run = workers(1); 9 + const promise = run(slowTask(100)); 10 + await run[Symbol.asyncDispose](); 11 + const result = await promise; 12 + assert.equal(result, 'done'); 13 + }); 14 + 15 + it('fires signal on async dispose', async () => { 16 + const run = workers(1); 17 + const promise = run(waitForAbort(run.signal)); 18 + await run[Symbol.asyncDispose](); 19 + const result = await promise; 20 + assert.equal(result, 'aborted'); 21 + }); 22 + 23 + it('fires signal on sync dispose', async () => { 24 + const run = workers(1); 25 + assert.equal(run.signal.aborted, false); 26 + run[Symbol.dispose](); 27 + assert.equal(run.signal.aborted, true); 28 + }); 29 + 30 + it('rejects new tasks after async dispose starts', async () => { 31 + const run = workers(1); 32 + await run[Symbol.asyncDispose](); 33 + await assert.rejects(() => run(slowTask(10)), { message: /disposed/ }); 34 + }); 35 + 36 + it('force-terminates after shutdownTimeout', async () => { 37 + const run = workers(1, { shutdownTimeout: 50 }); 38 + run(slowTask(10_000)); // will not finish in time 39 + const start = performance.now(); 40 + await run[Symbol.asyncDispose](); 41 + const elapsed = performance.now() - start; 42 + assert.ok(elapsed < 500, `Expected fast teardown, took ${elapsed}ms`); 43 + }); 44 + });
+13
test/fixtures/async-dispose.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const slowTask = mo(import.meta, async (ms: number): Promise<string> => { 4 + await new Promise((r) => setTimeout(r, ms)); 5 + return 'done'; 6 + }); 7 + 8 + export const waitForAbort = mo(import.meta, (signal: AbortSignal): Promise<string> => { 9 + return new Promise((resolve) => { 10 + if (signal.aborted) { resolve('aborted'); return; } 11 + signal.addEventListener('abort', () => resolve('aborted')); 12 + }); 13 + });