Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: streaming pipeline chaining and task-arg support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+74 -1
+10 -1
src/execute.ts
··· 5 5 import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 6 6 import { extractTransferables, collectTransferables } from './transfer.ts'; 7 7 import { Task } from './task.ts'; 8 + import { StreamTask } from './stream-task.ts'; 9 + import { runStreamOnDedicated } from './dedicated-runner.ts'; 8 10 import { STREAM } from './stream.ts'; 9 11 import type { StreamOptions } from './stream.ts'; 10 12 ··· 69 71 function prepareArg(arg: unknown): unknown { 70 72 if (typeof arg === 'object' && arg !== null && STREAM in arg) { 71 73 const data = (arg as any)[STREAM]; 74 + let iterable = data.iterable; 72 75 const highWater = data.options?.highWaterMark ?? DEFAULT_HIGH_WATER; 76 + 77 + // If the iterable is a StreamTask, dispatch it to its dedicated worker first 78 + if (iterable instanceof StreamTask) { 79 + iterable = runStreamOnDedicated(iterable.id, iterable.args); 80 + } 81 + 73 82 const { port1, port2 } = new MessageChannel(); 74 83 port1.unref(); 75 - pipeToPort(data.iterable, port1, highWater); 84 + pipeToPort(iterable, port1, highWater); 76 85 streamPortStack[streamPortStack.length - 1].push(port2); 77 86 return port2; 78 87 }
+29
test/stream-context.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { mo, workers } from 'moroutine'; 4 + 5 + const makeMultiplier = mo(import.meta, (factor: number): number => { 6 + return factor; 7 + }); 8 + 9 + const streamMultiplied = mo(import.meta, async function* (factor: number, count: number) { 10 + for (let i = 0; i < count; i++) { 11 + yield i * factor; 12 + } 13 + }); 14 + 15 + describe('streaming with task-args', () => { 16 + it('resolves task-args before streaming', async () => { 17 + const factor = makeMultiplier(3); 18 + const run = workers(1); 19 + try { 20 + const results: number[] = []; 21 + for await (const value of run(streamMultiplied(factor, 4))) { 22 + results.push(value); 23 + } 24 + assert.deepEqual(results, [0, 3, 6, 9]); 25 + } finally { 26 + run[Symbol.dispose](); 27 + } 28 + }); 29 + });
+35
test/stream-pipeline.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { mo, stream } from 'moroutine'; 4 + 5 + const generate = mo(import.meta, async function* (n: number) { 6 + for (let i = 1; i <= n; i++) yield i; 7 + }); 8 + 9 + const double = mo(import.meta, async function* (input: AsyncIterable<number>) { 10 + for await (const n of input) yield n * 2; 11 + }); 12 + 13 + const square = mo(import.meta, async function* (input: AsyncIterable<number>) { 14 + for await (const n of input) yield n * n; 15 + }); 16 + 17 + describe('streaming pipeline', () => { 18 + it('chains two streaming moroutines', async () => { 19 + const results: number[] = []; 20 + for await (const value of double(stream(generate(3)))) { 21 + results.push(value); 22 + } 23 + assert.deepEqual(results, [2, 4, 6]); 24 + }); 25 + 26 + it('chains three streaming moroutines', async () => { 27 + const results: number[] = []; 28 + const doubled = double(stream(generate(3))); 29 + const squared = square(stream(doubled)); 30 + for await (const value of squared) { 31 + results.push(value); 32 + } 33 + assert.deepEqual(results, [4, 16, 36]); 34 + }); 35 + });