Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: stream() input via MessageChannel with pause/resume backpressure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+180 -9
+62 -7
src/execute.ts
··· 1 1 import type { Worker } from 'node:worker_threads'; 2 2 import { MessageChannel } from 'node:worker_threads'; 3 + import type { MessagePort, Transferable } from 'node:worker_threads'; 3 4 import { freezeModule } from './registry.ts'; 4 5 import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 5 - import { extractTransferables } from './transfer.ts'; 6 + import { extractTransferables, collectTransferables } from './transfer.ts'; 6 7 import { Task } from './task.ts'; 8 + import { STREAM } from './stream.ts'; 7 9 import type { StreamOptions } from './stream.ts'; 8 10 9 11 let nextCallId = 0; 10 12 const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); 13 + const streamPortStack: MessagePort[][] = []; 11 14 12 15 export function setupWorker(worker: Worker): void { 13 16 worker.on('message', (msg: { callId: number; value?: unknown; error?: string }) => { ··· 22 25 }); 23 26 } 24 27 28 + const DEFAULT_HIGH_WATER = 16; 29 + const LOW_WATER = 1; 30 + 31 + function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void { 32 + let paused = false; 33 + let resumed: (() => void) | null = null; 34 + let cancelled = false; 35 + 36 + port.on('message', (signal: string) => { 37 + if (signal === 'pause') { paused = true; } 38 + else if (signal === 'resume') { 39 + paused = false; 40 + if (resumed) { resumed(); resumed = null; } 41 + } 42 + }); 43 + 44 + port.on('close', () => { cancelled = true; if (resumed) { resumed(); resumed = null; } }); 45 + 46 + (async () => { 47 + try { 48 + for await (const value of iterable) { 49 + if (cancelled) break; 50 + if (paused) await new Promise<void>((r) => { resumed = r; }); 51 + if (cancelled) break; 52 + const extracted = extractTransferables([value]); 53 + const serialized = serializeArg(extracted.args[0]); 54 + const transferList: Transferable[] = [...extracted.transfer]; 55 + collectTransferables(extracted.args[0], transferList); 56 + port.postMessage({ value: serialized, done: false }, transferList as any[]); 57 + } 58 + if (!cancelled) port.postMessage({ done: true }); 59 + } catch (err) { 60 + if (!cancelled) { 61 + const message = err instanceof Error ? err.message : String(err); 62 + port.postMessage({ done: true, error: message }); 63 + } 64 + } 65 + try { port.close(); } catch {} 66 + })(); 67 + } 68 + 25 69 function prepareArg(arg: unknown): unknown { 70 + if (typeof arg === 'object' && arg !== null && STREAM in arg) { 71 + const data = (arg as any)[STREAM]; 72 + const highWater = data.options?.highWaterMark ?? DEFAULT_HIGH_WATER; 73 + const { port1, port2 } = new MessageChannel(); 74 + port1.unref(); 75 + pipeToPort(data.iterable, port1, highWater); 76 + streamPortStack[streamPortStack.length - 1].push(port2); 77 + return port2; 78 + } 26 79 if (arg instanceof Task) { 27 80 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; 28 81 } ··· 36 89 return new Promise<T>((resolve, reject) => { 37 90 pending.set(callId, { resolve, reject }); 38 91 const extracted = extractTransferables(args); 39 - const msg = { callId, id, args: extracted.args.map(prepareArg) }; 40 - worker.postMessage(msg, extracted.transfer); 92 + streamPortStack.push([]); 93 + const preparedArgs = extracted.args.map(prepareArg); 94 + const ports = streamPortStack.pop()!; 95 + const msg = { callId, id, args: preparedArgs }; 96 + worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); 41 97 }); 42 98 } 43 - 44 - const DEFAULT_HIGH_WATER = 16; 45 - const LOW_WATER = 1; 46 99 47 100 export function dispatchStream<T>(worker: Worker, id: string, args: unknown[], opts?: StreamOptions): AsyncIterable<T> { 48 101 const url = id.slice(0, id.lastIndexOf('#')); ··· 53 106 port1.unref(); 54 107 55 108 const extracted = extractTransferables(args); 109 + streamPortStack.push([]); 56 110 const preparedArgs = extracted.args.map(prepareArg); 111 + const ports = streamPortStack.pop()!; 57 112 const msg = { id, args: preparedArgs, port: port2 }; 58 - worker.postMessage(msg, [...extracted.transfer, port2] as any[]); 113 + worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 59 114 60 115 const queue: T[] = []; 61 116 let done = false;
+61 -2
src/worker-entry.ts
··· 1 - import { parentPort } from 'node:worker_threads'; 2 - import type { MessagePort, Transferable } from 'node:worker_threads'; 1 + import { parentPort, MessagePort } from 'node:worker_threads'; 2 + import type { Transferable } from 'node:worker_threads'; 3 3 import { registry } from './registry.ts'; 4 4 import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5 5 import { collectTransferables } from './transfer.ts'; ··· 11 11 return typeof arg === 'object' && arg !== null && '__task__' in arg; 12 12 } 13 13 14 + function portToAsyncIterable<T>(port: MessagePort): AsyncIterable<T> { 15 + const queue: T[] = []; 16 + let done = false; 17 + let error: Error | null = null; 18 + let paused = false; 19 + let waiting: (() => void) | null = null; 20 + 21 + const HIGH_WATER = 16; 22 + 23 + port.on('message', (msg: { value?: unknown; done?: boolean; error?: string }) => { 24 + if (msg.error) { 25 + error = new Error(msg.error); 26 + done = true; 27 + if (waiting) { waiting(); waiting = null; } 28 + return; 29 + } 30 + if (msg.done) { 31 + done = true; 32 + if (waiting) { waiting(); waiting = null; } 33 + return; 34 + } 35 + queue.push(deserializeArg(msg.value) as T); 36 + if (waiting) { waiting(); waiting = null; } 37 + if (!paused && queue.length >= HIGH_WATER) { 38 + paused = true; 39 + port.postMessage('pause'); 40 + } 41 + }); 42 + 43 + return { 44 + [Symbol.asyncIterator]() { 45 + return { 46 + async next(): Promise<IteratorResult<T>> { 47 + while (true) { 48 + if (queue.length > 0) { 49 + const value = queue.shift()!; 50 + if (paused && queue.length <= 1) { 51 + paused = false; 52 + port.postMessage('resume'); 53 + } 54 + return { done: false, value }; 55 + } 56 + if (error) throw error; 57 + if (done) return { done: true, value: undefined }; 58 + await new Promise<void>((resolve) => { waiting = resolve; }); 59 + } 60 + }, 61 + async return(): Promise<IteratorResult<T>> { 62 + port.close(); 63 + return { done: true, value: undefined }; 64 + }, 65 + }; 66 + }, 67 + }; 68 + } 69 + 14 70 async function resolveArg(arg: unknown): Promise<unknown> { 71 + if (arg instanceof MessagePort) { 72 + return portToAsyncIterable(arg); 73 + } 15 74 if (isTaskArg(arg)) { 16 75 if (taskCache.has(arg.__task__)) { 17 76 return taskCache.get(arg.__task__);
+15
test/fixtures/stream-input.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const sumStream = mo(import.meta, async (input: AsyncIterable<number>): Promise<number> => { 4 + let sum = 0; 5 + for await (const n of input) { 6 + sum += n; 7 + } 8 + return sum; 9 + }); 10 + 11 + export const uppercaseStream = mo(import.meta, async function* (input: AsyncIterable<string>) { 12 + for await (const s of input) { 13 + yield s.toUpperCase(); 14 + } 15 + });
+42
test/stream-input.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers, stream } from 'moroutine'; 4 + import { sumStream, uppercaseStream } from './fixtures/stream-input.ts'; 5 + 6 + describe('stream() input (main -> worker)', () => { 7 + it('pipes async iterable to worker', async () => { 8 + async function* numbers() { yield 1; yield 2; yield 3; } 9 + const run = workers(1); 10 + try { 11 + const result = await run(sumStream(stream(numbers()))); 12 + assert.equal(result, 6); 13 + } finally { 14 + run[Symbol.dispose](); 15 + } 16 + }); 17 + 18 + it('bidirectional: stream input + generator output', async () => { 19 + async function* words() { yield 'hello'; yield 'world'; } 20 + const run = workers(1); 21 + try { 22 + const results: string[] = []; 23 + for await (const word of run(uppercaseStream(stream(words())))) { 24 + results.push(word); 25 + } 26 + assert.deepEqual(results, ['HELLO', 'WORLD']); 27 + } finally { 28 + run[Symbol.dispose](); 29 + } 30 + }); 31 + 32 + it('accepts highWaterMark on stream()', async () => { 33 + async function* numbers() { yield 1; yield 2; yield 3; } 34 + const run = workers(1); 35 + try { 36 + const result = await run(sumStream(stream(numbers(), { highWaterMark: 2 }))); 37 + assert.equal(result, 6); 38 + } finally { 39 + run[Symbol.dispose](); 40 + } 41 + }); 42 + });