Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: keep pool workers ref'd until disposal

Pool workers are now ref'd for the lifetime of the pool, preventing the
Node.js event loop from exiting prematurely when using top-level await.
Dedicated workers ref/unref with active counting around each dispatch.

Moves ref/unref responsibility out of the shared dispatch layer
(execute/setupWorker) and into the callers — pool workers stay ref'd,
dedicated workers track an active count and only unref when idle.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+65 -20
+7
.changeset/pool-worker-ref.md
··· 1 + --- 2 + 'moroutine': patch 3 + --- 4 + 5 + Fix pool workers exiting early with top-level await 6 + 7 + Pool workers are now ref'd for the lifetime of the pool, preventing the Node.js event loop from exiting prematurely when using `using run = workers()` with top-level `await`. Dedicated workers continue to ref only while tasks are in-flight.
+18 -2
src/dedicated-runner.ts
··· 3 3 4 4 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 5 5 const workers = new Map<string, Worker>(); 6 + const activeCounts = new Map<Worker, number>(); 6 7 7 8 function getWorker(id: string): Worker { 8 9 let worker = workers.get(id); ··· 15 16 return worker; 16 17 } 17 18 19 + function ref(worker: Worker): void { 20 + const count = activeCounts.get(worker) ?? 0; 21 + if (count === 0) worker.ref(); 22 + activeCounts.set(worker, count + 1); 23 + } 24 + 25 + function unref(worker: Worker): void { 26 + const count = (activeCounts.get(worker) ?? 1) - 1; 27 + activeCounts.set(worker, count); 28 + if (count === 0) worker.unref(); 29 + } 30 + 18 31 export function runOnDedicated<T>(id: string, args: unknown[]): Promise<T> { 19 32 const worker = getWorker(id); 20 - return execute<T>(worker, id, args); 33 + ref(worker); 34 + return execute<T>(worker, id, args).finally(() => unref(worker)); 21 35 } 22 36 23 37 export function runStreamOnDedicated<T>(id: string, args: unknown[]): AsyncIterable<T> { 24 38 const worker = getWorker(id); 25 - const { iterable } = dispatchStream<T>(worker, id, args); 39 + ref(worker); 40 + const { iterable, done } = dispatchStream<T>(worker, id, args); 41 + done.then(() => unref(worker), () => unref(worker)); 26 42 return iterable; 27 43 }
-17
src/execute.ts
··· 20 20 const call = pending.get(msg.callId); 21 21 if (!call) return; 22 22 pending.delete(msg.callId); 23 - // Unref the worker now that this call is done (re-ref happened in execute()) 24 - worker.unref(); 25 23 if (msg.error !== undefined) { 26 24 call.reject(new Error(msg.error)); 27 25 } else { ··· 130 128 const url = id.slice(0, id.lastIndexOf('#')); 131 129 freezeModule(url); 132 130 const callId = nextCallId++; 133 - // Ref the worker for the duration of this call so the event loop stays alive 134 - worker.ref(); 135 131 return new Promise<T>((resolve, reject) => { 136 132 pending.set(callId, { resolve, reject }); 137 133 const extracted = extractTransferables(args); ··· 165 161 const preparedArgs = extracted.args.map(prepareArg); 166 162 const ports = streamPortStack.pop()!; 167 163 const msg = { id, args: preparedArgs, port: port2 }; 168 - // Ref the worker for the duration of the stream so the event loop stays alive 169 - worker.ref(); 170 164 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 171 165 172 166 let resolveDone: () => void; ··· 179 173 let error: Error | null = null; 180 174 let paused = false; 181 175 let waiting: (() => void) | null = null; 182 - let workerUnrefed = false; 183 - 184 - function unrefWorkerOnce() { 185 - if (!workerUnrefed) { 186 - workerUnrefed = true; 187 - worker.unref(); 188 - } 189 - } 190 176 191 177 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: string }) => { 192 178 if (msg.error) { 193 179 error = new Error(msg.error); 194 180 done = true; 195 181 port1.close(); 196 - unrefWorkerOnce(); 197 182 resolveDone!(); 198 183 if (waiting) { 199 184 waiting(); ··· 204 189 if (msg.done) { 205 190 done = true; 206 191 port1.close(); 207 - unrefWorkerOnce(); 208 192 resolveDone!(); 209 193 if (waiting) { 210 194 waiting(); ··· 247 231 }, 248 232 async return(): Promise<IteratorResult<T>> { 249 233 port1.close(); 250 - unrefWorkerOnce(); 251 234 resolveDone!(); 252 235 return { done: true, value: undefined }; 253 236 },
-1
src/worker-pool.ts
··· 34 34 for (let i = 0; i < size; i++) { 35 35 const worker = new Worker(workerEntryUrl); 36 36 setupWorker(worker); 37 - worker.unref(); 38 37 pool.push(worker); 39 38 } 40 39
+6
test/fixtures/load-balancing.ts
··· 7 7 await setTimeout(ms); 8 8 return 'done'; 9 9 }); 10 + 11 + export const busy = mo(import.meta, (ms: number): string => { 12 + const start = Date.now(); 13 + while (Date.now() - start < ms) { /* busy wait */ } 14 + return 'done'; 15 + });
+14
test/fixtures/pool-ref-main.ts
··· 1 + import { workers } from 'moroutine'; 2 + import { busy } from './load-balancing.ts'; 3 + 4 + // Dispatches CPU-bound tasks on a dedicated worker, then on a pool. 5 + // The process should complete and print DONE; if pool workers 6 + // are unref'd the event loop may exit early (exit code 13). 7 + 8 + await busy(10); 9 + 10 + { 11 + using run = workers(2); 12 + const results = await run([busy(50), busy(50), busy(50)]); 13 + console.log('DONE ' + results.join(',')); 14 + }
+20
test/pool-ref.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { execFile } from 'node:child_process'; 4 + import { promisify } from 'node:util'; 5 + import { fileURLToPath } from 'node:url'; 6 + import { join } from 'node:path'; 7 + 8 + const exec = promisify(execFile); 9 + const fixturesDir = join(fileURLToPath(import.meta.url), '..', 'fixtures'); 10 + 11 + describe('pool worker ref behavior', () => { 12 + it('pool keeps event loop alive for top-level await', async () => { 13 + const { stdout } = await exec( 14 + process.execPath, 15 + ['--no-warnings', join(fixturesDir, 'pool-ref-main.ts')], 16 + { timeout: 5000 }, 17 + ); 18 + assert.ok(stdout.includes('DONE')); 19 + }); 20 + });