Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: output streaming via MessageChannel with pause/resume backpressure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+229 -7
+6 -1
src/dedicated-runner.ts
··· 1 1 import { Worker } from 'node:worker_threads'; 2 - import { setupWorker, execute } from './execute.ts'; 2 + import { setupWorker, execute, dispatchStream } from './execute.ts'; 3 3 4 4 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 5 5 const workers = new Map<string, Worker>(); ··· 19 19 const worker = getWorker(id); 20 20 return execute<T>(worker, id, args); 21 21 } 22 + 23 + export function runStreamOnDedicated<T>(id: string, args: unknown[]): AsyncIterable<T> { 24 + const worker = getWorker(id); 25 + return dispatchStream<T>(worker, id, args); 26 + }
+73
src/execute.ts
··· 1 1 import type { Worker } from 'node:worker_threads'; 2 + import { MessageChannel } from 'node:worker_threads'; 2 3 import { freezeModule } from './registry.ts'; 3 4 import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 4 5 import { extractTransferables } from './transfer.ts'; 5 6 import { Task } from './task.ts'; 7 + import type { StreamOptions } from './stream.ts'; 6 8 7 9 let nextCallId = 0; 8 10 const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); ··· 38 40 worker.postMessage(msg, extracted.transfer); 39 41 }); 40 42 } 43 + 44 + const DEFAULT_HIGH_WATER = 16; 45 + const LOW_WATER = 1; 46 + 47 + export function dispatchStream<T>(worker: Worker, id: string, args: unknown[], opts?: StreamOptions): AsyncIterable<T> { 48 + const url = id.slice(0, id.lastIndexOf('#')); 49 + freezeModule(url); 50 + const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 51 + 52 + const { port1, port2 } = new MessageChannel(); 53 + port1.unref(); 54 + 55 + const extracted = extractTransferables(args); 56 + const preparedArgs = extracted.args.map(prepareArg); 57 + const msg = { id, args: preparedArgs, port: port2 }; 58 + worker.postMessage(msg, [...extracted.transfer, port2] as any[]); 59 + 60 + const queue: T[] = []; 61 + let done = false; 62 + let error: Error | null = null; 63 + let paused = false; 64 + let waiting: (() => void) | null = null; 65 + 66 + port1.on('message', (msg: { value?: unknown; done?: boolean; error?: string }) => { 67 + if (msg.error) { 68 + error = new Error(msg.error); 69 + done = true; 70 + port1.close(); 71 + if (waiting) { waiting(); waiting = null; } 72 + return; 73 + } 74 + if (msg.done) { 75 + done = true; 76 + port1.close(); 77 + if (waiting) { waiting(); waiting = null; } 78 + return; 79 + } 80 + queue.push(deserializeArg(msg.value) as T); 81 + if (waiting) { waiting(); waiting = null; } 82 + if (!paused && queue.length >= highWater) { 83 + paused = true; 84 + port1.postMessage('pause'); 85 + } 86 + }); 87 + 88 + return { 89 + [Symbol.asyncIterator]() { 90 + return { 91 + async next(): Promise<IteratorResult<T>> { 92 + while (true) { 93 + if (queue.length > 0) { 94 + const value = queue.shift()!; 95 + if (paused && queue.length <= LOW_WATER) { 96 + paused = false; 97 + port1.postMessage('resume'); 98 + } 99 + return { done: false, value }; 100 + } 101 + if (error) throw error; 102 + if (done) return { done: true, value: undefined }; 103 + await new Promise<void>((resolve) => { waiting = resolve; }); 104 + } 105 + }, 106 + async return(): Promise<IteratorResult<T>> { 107 + port1.close(); 108 + return { done: true, value: undefined }; 109 + }, 110 + }; 111 + }, 112 + }; 113 + }
+3
src/runner.ts
··· 1 1 import type { Task } from './task.ts'; 2 + import type { StreamTask } from './stream-task.ts'; 3 + import type { StreamOptions } from './stream.ts'; 2 4 3 5 type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 4 6 ··· 6 8 export type Runner = { 7 9 <T>(task: Task<T>): Promise<T>; 8 10 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 11 + <T>(task: StreamTask<T>, opts?: StreamOptions): AsyncIterable<T>; 9 12 [Symbol.dispose](): void; 10 13 };
+7
src/stream-task.ts
··· 1 + import { runStreamOnDedicated } from './dedicated-runner.ts'; 2 + 1 3 let nextUid = 0; 2 4 3 5 /** A deferred streaming computation. When dispatched, returns an AsyncIterable instead of a Promise. */ ··· 10 12 this.uid = nextUid++; 11 13 this.id = id; 12 14 this.args = args; 15 + } 16 + 17 + [Symbol.asyncIterator](): AsyncIterator<T> { 18 + const iterable = runStreamOnDedicated<T>(this.id, this.args); 19 + return iterable[Symbol.asyncIterator](); 13 20 } 14 21 }
+47 -4
src/worker-entry.ts
··· 1 1 import { parentPort } from 'node:worker_threads'; 2 + import type { MessagePort, Transferable } from 'node:worker_threads'; 2 3 import { registry } from './registry.ts'; 3 4 import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 4 5 import { collectTransferables } from './transfer.ts'; 5 - import type { Transferable } from 'node:worker_threads'; 6 6 7 7 const imported = new Set<string>(); 8 8 const taskCache = new Map<number, unknown>(); ··· 33 33 return deserializeArg(arg); 34 34 } 35 35 36 - parentPort!.on('message', async (msg: { callId: number; id: string; args: unknown[] }) => { 37 - const { callId, id, args } = msg; 36 + parentPort!.on('message', async (msg: { callId?: number; id: string; args: unknown[]; port?: MessagePort }) => { 37 + const { id, args, port } = msg; 38 38 try { 39 39 const url = id.slice(0, id.lastIndexOf('#')); 40 40 if (!imported.has(url)) { ··· 46 46 if (!fn) throw new Error(`Moroutine not found: ${id}`); 47 47 48 48 const resolvedArgs = await Promise.all(args.map(resolveArg)); 49 + 50 + if (port) { 51 + let paused = false; 52 + let resumed: (() => void) | null = null; 53 + let cancelled = false; 54 + 55 + port.on('message', (signal: string) => { 56 + if (signal === 'pause') { paused = true; } 57 + else if (signal === 'resume') { 58 + paused = false; 59 + if (resumed) { resumed(); resumed = null; } 60 + } 61 + }); 62 + 63 + port.on('close', () => { 64 + cancelled = true; 65 + if (resumed) { resumed(); resumed = null; } 66 + }); 67 + 68 + try { 69 + const gen = fn(...resolvedArgs) as AsyncGenerator; 70 + for await (const value of gen) { 71 + if (cancelled) break; 72 + if (paused) await new Promise<void>((r) => { resumed = r; }); 73 + if (cancelled) break; 74 + const serialized = serializeArg(value); 75 + const transferList: Transferable[] = []; 76 + collectTransferables(value, transferList); 77 + port.postMessage({ value: serialized, done: false }, transferList); 78 + await new Promise((r) => setImmediate(r)); 79 + } 80 + if (!cancelled) port.postMessage({ done: true }); 81 + } catch (err) { 82 + if (!cancelled) { 83 + const message = err instanceof Error ? err.message : String(err); 84 + port.postMessage({ done: true, error: message }); 85 + } 86 + } 87 + try { port.close(); } catch {} 88 + return; 89 + } 90 + 91 + const callId = msg.callId!; 49 92 const value = await fn(...resolvedArgs); 50 93 const returnValue = serializeArg(value); 51 94 const transferList: Transferable[] = []; ··· 53 96 parentPort!.postMessage({ callId, value: returnValue }, transferList); 54 97 } catch (err) { 55 98 const message = err instanceof Error ? err.message : String(err); 56 - parentPort!.postMessage({ callId, error: message }); 99 + parentPort!.postMessage({ callId: msg.callId!, error: message }); 57 100 } 58 101 });
+10 -2
src/worker-pool.ts
··· 1 1 import { Worker } from 'node:worker_threads'; 2 2 import { availableParallelism } from 'node:os'; 3 - import { setupWorker, execute } from './execute.ts'; 3 + import { setupWorker, execute, dispatchStream } from './execute.ts'; 4 4 import type { Task } from './task.ts'; 5 + import { StreamTask } from './stream-task.ts'; 6 + import type { StreamOptions } from './stream.ts'; 5 7 import type { Runner } from './runner.ts'; 6 8 7 9 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); ··· 31 33 } 32 34 33 35 const run: Runner = Object.assign( 34 - (taskOrTasks: Task<any> | Task<any>[]): any => { 36 + (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, opts?: StreamOptions): any => { 37 + if (taskOrTasks instanceof StreamTask) { 38 + if (disposed) throw new Error('Worker pool is disposed'); 39 + const worker = pool[next % pool.length]; 40 + next++; 41 + return dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, opts); 42 + } 35 43 if (Array.isArray(taskOrTasks)) { 36 44 return Promise.all(taskOrTasks.map((t) => dispatch(t))); 37 45 }
+14
test/fixtures/stream-gen.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const countUp = mo(import.meta, async function* (n: number) { 4 + for (let i = 0; i < n; i++) { 5 + yield i; 6 + } 7 + }); 8 + 9 + export const failAfter = mo(import.meta, async function* (n: number) { 10 + for (let i = 0; i < n; i++) { 11 + yield i; 12 + } 13 + throw new Error('intentional error'); 14 + });
+69
test/stream.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers } from 'moroutine'; 4 + import { countUp, failAfter } from './fixtures/stream-gen.ts'; 5 + 6 + describe('streaming moroutines (worker -> main)', () => { 7 + it('iterates yielded values from worker', async () => { 8 + const run = workers(1); 9 + try { 10 + const results: number[] = []; 11 + for await (const value of run(countUp(5))) { 12 + results.push(value); 13 + } 14 + assert.deepEqual(results, [0, 1, 2, 3, 4]); 15 + } finally { 16 + run[Symbol.dispose](); 17 + } 18 + }); 19 + 20 + it('supports early break', async () => { 21 + const run = workers(1); 22 + try { 23 + const results: number[] = []; 24 + for await (const value of run(countUp(100))) { 25 + results.push(value); 26 + if (results.length >= 3) break; 27 + } 28 + assert.deepEqual(results, [0, 1, 2]); 29 + } finally { 30 + run[Symbol.dispose](); 31 + } 32 + }); 33 + 34 + it('propagates errors from worker generator', async () => { 35 + const run = workers(1); 36 + try { 37 + const results: number[] = []; 38 + await assert.rejects(async () => { 39 + for await (const value of run(failAfter(3))) { 40 + results.push(value); 41 + } 42 + }, { message: 'intentional error' }); 43 + assert.deepEqual(results, [0, 1, 2]); 44 + } finally { 45 + run[Symbol.dispose](); 46 + } 47 + }); 48 + 49 + it('works on dedicated worker without pool', async () => { 50 + const results: number[] = []; 51 + for await (const value of countUp(3)) { 52 + results.push(value); 53 + } 54 + assert.deepEqual(results, [0, 1, 2]); 55 + }); 56 + 57 + it('accepts highWaterMark option', async () => { 58 + const run = workers(1); 59 + try { 60 + const results: number[] = []; 61 + for await (const value of run(countUp(5), { highWaterMark: 2 })) { 62 + results.push(value); 63 + } 64 + assert.deepEqual(results, [0, 1, 2, 3, 4]); 65 + } finally { 66 + run[Symbol.dispose](); 67 + } 68 + }); 69 + });