Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: pipeline example and process exit verification

Fix worker ref/unref lifecycle so dedicated workers don't hold the event
loop open when idle: setupWorker before unref (so unref wins), then
ref/unref around each execute() call and for the duration of each
dispatchStream(). Streaming port1 is unref'd after its listener is
added, so stream completion triggers natural exit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+107 -3
+22
examples/pipeline/main.ts
··· 1 + // Pipeline of streaming moroutines, each step on its own dedicated worker. 2 + // Data flows: generate → double → square → toString, all via MessageChannel. 3 + // Requires Node v24+. 4 + // 5 + // Run: node examples/pipeline/main.ts 6 + 7 + import { stream } from '../../src/index.ts'; 8 + import { generate, double, square, toString } from './steps.ts'; 9 + 10 + const numbers = generate(5); 11 + const doubled = double(stream(numbers)); 12 + const squared = square(stream(doubled)); 13 + const labels = toString(stream(squared)); 14 + 15 + for await (const label of labels) { 16 + console.log(label); 17 + } 18 + // => 4 19 + // => 16 20 + // => 36 21 + // => 64 22 + // => 100
+25
examples/pipeline/steps.ts
··· 1 + import { mo } from '../../src/index.ts'; 2 + 3 + export const generate = mo(import.meta, async function* (count: number) { 4 + for (let i = 1; i <= count; i++) { 5 + yield i; 6 + } 7 + }); 8 + 9 + export const double = mo(import.meta, async function* (input: AsyncIterable<number>) { 10 + for await (const n of input) { 11 + yield n * 2; 12 + } 13 + }); 14 + 15 + export const square = mo(import.meta, async function* (input: AsyncIterable<number>) { 16 + for await (const n of input) { 17 + yield n * n; 18 + } 19 + }); 20 + 21 + export const toString = mo(import.meta, async function* (input: AsyncIterable<number>) { 22 + for await (const n of input) { 23 + yield `=> ${n}`; 24 + } 25 + });
+1 -1
src/dedicated-runner.ts
··· 8 8 let worker = workers.get(id); 9 9 if (!worker) { 10 10 worker = new Worker(workerEntryUrl); 11 - worker.unref(); 12 11 setupWorker(worker); 12 + worker.unref(); 13 13 workers.set(id, worker); 14 14 } 15 15 return worker;
+15 -1
src/execute.ts
··· 19 19 const call = pending.get(msg.callId); 20 20 if (!call) return; 21 21 pending.delete(msg.callId); 22 + // Unref the worker now that this call is done (re-ref happened in execute()) 23 + worker.unref(); 22 24 if (msg.error !== undefined) { 23 25 call.reject(new Error(msg.error)); 24 26 } else { ··· 95 97 const url = id.slice(0, id.lastIndexOf('#')); 96 98 freezeModule(url); 97 99 const callId = nextCallId++; 100 + // Ref the worker for the duration of this call so the event loop stays alive 101 + worker.ref(); 98 102 return new Promise<T>((resolve, reject) => { 99 103 pending.set(callId, { resolve, reject }); 100 104 const extracted = extractTransferables(args); ··· 112 116 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 113 117 114 118 const { port1, port2 } = new MessageChannel(); 115 - port1.unref(); 116 119 117 120 const extracted = extractTransferables(args); 118 121 streamPortStack.push([]); 119 122 const preparedArgs = extracted.args.map(prepareArg); 120 123 const ports = streamPortStack.pop()!; 121 124 const msg = { id, args: preparedArgs, port: port2 }; 125 + // Ref the worker for the duration of the stream so the event loop stays alive 126 + worker.ref(); 122 127 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 123 128 124 129 const queue: T[] = []; ··· 126 131 let error: Error | null = null; 127 132 let paused = false; 128 133 let waiting: (() => void) | null = null; 134 + let workerUnrefed = false; 135 + 136 + function unrefWorkerOnce() { 137 + if (!workerUnrefed) { workerUnrefed = true; worker.unref(); } 138 + } 129 139 130 140 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: string }) => { 131 141 if (msg.error) { 132 142 error = new Error(msg.error); 133 143 done = true; 134 144 port1.close(); 145 + unrefWorkerOnce(); 135 146 if (waiting) { waiting(); waiting = null; } 136 147 return; 137 148 } 138 149 if (msg.done) { 139 150 done = true; 140 151 port1.close(); 152 + unrefWorkerOnce(); 141 153 if (waiting) { waiting(); waiting = null; } 142 154 return; 143 155 } ··· 148 160 port1.postMessage('pause'); 149 161 } 150 162 }); 163 + port1.unref(); 151 164 152 165 return { 153 166 [Symbol.asyncIterator]() { ··· 169 182 }, 170 183 async return(): Promise<IteratorResult<T>> { 171 184 port1.close(); 185 + unrefWorkerOnce(); 172 186 return { done: true, value: undefined }; 173 187 }, 174 188 };
+1 -1
src/worker-pool.ts
··· 17 17 const pool: Worker[] = []; 18 18 for (let i = 0; i < size; i++) { 19 19 const worker = new Worker(workerEntryUrl); 20 - worker.unref(); 21 20 setupWorker(worker); 21 + worker.unref(); 22 22 pool.push(worker); 23 23 } 24 24
+4
test/fixtures/exit-dedicated-main.ts
··· 1 + import { gen } from './exit-gen.ts'; 2 + 3 + for await (const v of gen()) {} 4 + console.log('DONE');
+3
test/fixtures/exit-gen.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const gen = mo(import.meta, async function* () { yield 1; yield 2; });
+7
test/fixtures/exit-pool-main.ts
··· 1 + import { workers } from 'moroutine'; 2 + import { gen } from './exit-gen.ts'; 3 + 4 + const run = workers(1); 5 + for await (const v of run(gen())) {} 6 + run[Symbol.dispose](); 7 + console.log('DONE');
+29
test/stream-exit.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { execFile } from 'node:child_process'; 4 + import { promisify } from 'node:util'; 5 + import { fileURLToPath } from 'node:url'; 6 + import { join } from 'node:path'; 7 + 8 + const exec = promisify(execFile); 9 + const fixturesDir = join(fileURLToPath(import.meta.url), '..', 'fixtures'); 10 + 11 + describe('streaming process exit', () => { 12 + it('process exits after streaming on dedicated worker', async () => { 13 + const { stdout } = await exec(process.execPath, [ 14 + '--no-warnings', 15 + '--experimental-strip-types', 16 + join(fixturesDir, 'exit-dedicated-main.ts'), 17 + ], { timeout: 5000 }); 18 + assert.ok(stdout.includes('DONE')); 19 + }); 20 + 21 + it('process exits after streaming with pool', async () => { 22 + const { stdout } = await exec(process.execPath, [ 23 + '--no-warnings', 24 + '--experimental-strip-types', 25 + join(fixturesDir, 'exit-pool-main.ts'), 26 + ], { timeout: 5000 }); 27 + assert.ok(stdout.includes('DONE')); 28 + }); 29 + });