Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: rename stream() to channel()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

Devin Ivy 80d548be 779fde94

+68 -68
+4 -4
examples/pipeline/main.ts
··· 4 4 // 5 5 // Run: node examples/pipeline/main.ts 6 6 7 - import { stream } from '../../src/index.ts'; 7 + import { channel } from '../../src/index.ts'; 8 8 import { generate, double, square, toString } from './steps.ts'; 9 9 10 10 const numbers = generate(5); 11 - const doubled = double(stream(numbers)); 12 - const squared = square(stream(doubled)); 13 - const labels = toString(stream(squared)); 11 + const doubled = double(channel(numbers)); 12 + const squared = square(channel(doubled)); 13 + const labels = toString(channel(squared)); 14 14 15 15 for await (const label of labels) { 16 16 console.log(label);
+24
src/channel.ts
··· 1 + const CHANNEL = Symbol.for('moroutine.channel'); 2 + 3 + export interface ChannelOptions { 4 + highWaterMark?: number; 5 + } 6 + 7 + export interface ChannelMarker<T> { 8 + readonly [CHANNEL]: { 9 + iterable: AsyncIterable<T>; 10 + options?: ChannelOptions; 11 + }; 12 + } 13 + 14 + /** 15 + * Wraps an AsyncIterable for streaming to a worker. 16 + * @param iterable - The async iterable to pipe to the worker. 17 + * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). 18 + * @returns The iterable, typed as AsyncIterable<T> for transparent moroutine arg use. 19 + */ 20 + export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> { 21 + return { [CHANNEL]: { iterable, options: opts } } as unknown as AsyncIterable<T>; 22 + } 23 + 24 + export { CHANNEL };
+5 -5
src/execute.ts
··· 7 7 import { Task } from './task.ts'; 8 8 import { StreamTask } from './stream-task.ts'; 9 9 import { runStreamOnDedicated } from './dedicated-runner.ts'; 10 - import { STREAM } from './stream.ts'; 11 - import type { StreamOptions } from './stream.ts'; 10 + import { CHANNEL } from './channel.ts'; 11 + import type { ChannelOptions } from './channel.ts'; 12 12 13 13 let nextCallId = 0; 14 14 const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); ··· 71 71 } 72 72 73 73 function prepareArg(arg: unknown): unknown { 74 - if (typeof arg === 'object' && arg !== null && STREAM in arg) { 75 - const data = (arg as any)[STREAM]; 74 + if (typeof arg === 'object' && arg !== null && CHANNEL in arg) { 75 + const data = (arg as any)[CHANNEL]; 76 76 let iterable = data.iterable; 77 77 const highWater = data.options?.highWaterMark ?? DEFAULT_HIGH_WATER; 78 78 ··· 110 110 }); 111 111 } 112 112 113 - export function dispatchStream<T>(worker: Worker, id: string, args: unknown[], opts?: StreamOptions): AsyncIterable<T> { 113 + export function dispatchStream<T>(worker: Worker, id: string, args: unknown[], opts?: ChannelOptions): AsyncIterable<T> { 114 114 const url = id.slice(0, id.lastIndexOf('#')); 115 115 freezeModule(url); 116 116 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER;
+2 -2
src/index.ts
··· 2 2 export type { Arg } from './mo.ts'; 3 3 export { Task } from './task.ts'; 4 4 export { StreamTask } from './stream-task.ts'; 5 - export { stream } from './stream.ts'; 6 - export type { StreamOptions } from './stream.ts'; 5 + export { channel } from './channel.ts'; 6 + export type { ChannelOptions } from './channel.ts'; 7 7 export { workers } from './worker-pool.ts'; 8 8 export { transfer } from './transfer.ts'; 9 9 export type { Runner } from './runner.ts';
+2 -2
src/runner.ts
··· 1 1 import type { Task } from './task.ts'; 2 2 import type { StreamTask } from './stream-task.ts'; 3 - import type { StreamOptions } from './stream.ts'; 3 + import type { ChannelOptions } from './channel.ts'; 4 4 5 5 type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 6 6 ··· 8 8 export type Runner = { 9 9 <T>(task: Task<T>): Promise<T>; 10 10 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 11 - <T>(task: StreamTask<T>, opts?: StreamOptions): AsyncIterable<T>; 11 + <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 12 12 [Symbol.dispose](): void; 13 13 };
-24
src/stream.ts
··· 1 - const STREAM = Symbol.for('moroutine.stream'); 2 - 3 - export interface StreamOptions { 4 - highWaterMark?: number; 5 - } 6 - 7 - export interface StreamMarker<T> { 8 - readonly [STREAM]: { 9 - iterable: AsyncIterable<T>; 10 - options?: StreamOptions; 11 - }; 12 - } 13 - 14 - /** 15 - * Wraps an AsyncIterable for streaming to a worker. 16 - * @param iterable - The async iterable to pipe to the worker. 17 - * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). 18 - * @returns The iterable, typed as AsyncIterable<T> for transparent moroutine arg use. 19 - */ 20 - export function stream<T>(iterable: AsyncIterable<T>, opts?: StreamOptions): AsyncIterable<T> { 21 - return { [STREAM]: { iterable, options: opts } } as unknown as AsyncIterable<T>; 22 - } 23 - 24 - export { STREAM };
+2 -2
src/worker-pool.ts
··· 3 3 import { setupWorker, execute, dispatchStream } from './execute.ts'; 4 4 import type { Task } from './task.ts'; 5 5 import { StreamTask } from './stream-task.ts'; 6 - import type { StreamOptions } from './stream.ts'; 6 + import type { ChannelOptions } from './channel.ts'; 7 7 import type { Runner } from './runner.ts'; 8 8 9 9 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); ··· 33 33 } 34 34 35 35 const run: Runner = Object.assign( 36 - (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, opts?: StreamOptions): any => { 36 + (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, opts?: ChannelOptions): any => { 37 37 if (taskOrTasks instanceof StreamTask) { 38 38 if (disposed) throw new Error('Worker pool is disposed'); 39 39 const worker = pool[next % pool.length];
+18
test/channel-wrapper.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + 4 + describe('channel()', () => { 5 + it('returns an object with the channel marker', async () => { 6 + const { channel } = await import('../src/channel.ts'); 7 + async function* gen() { yield 1; } 8 + const wrapped = channel(gen()); 9 + assert.ok(Symbol.for('moroutine.channel') in (wrapped as any)); 10 + }); 11 + 12 + it('accepts highWaterMark option', async () => { 13 + const { channel } = await import('../src/channel.ts'); 14 + async function* gen() { yield 1; } 15 + const wrapped = channel(gen(), { highWaterMark: 32 }); 16 + assert.ok(wrapped); 17 + }); 18 + });
+7 -7
test/stream-input.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workers, stream } from 'moroutine'; 3 + import { workers, channel } from 'moroutine'; 4 4 import { sumStream, uppercaseStream } from './fixtures/stream-input.ts'; 5 5 6 - describe('stream() input (main -> worker)', () => { 6 + describe('channel() input (main -> worker)', () => { 7 7 it('pipes async iterable to worker', async () => { 8 8 async function* numbers() { yield 1; yield 2; yield 3; } 9 9 const run = workers(1); 10 10 try { 11 - const result = await run(sumStream(stream(numbers()))); 11 + const result = await run(sumStream(channel(numbers()))); 12 12 assert.equal(result, 6); 13 13 } finally { 14 14 run[Symbol.dispose](); 15 15 } 16 16 }); 17 17 18 - it('bidirectional: stream input + generator output', async () => { 18 + it('bidirectional: channel input + generator output', async () => { 19 19 async function* words() { yield 'hello'; yield 'world'; } 20 20 const run = workers(1); 21 21 try { 22 22 const results: string[] = []; 23 - for await (const word of run(uppercaseStream(stream(words())))) { 23 + for await (const word of run(uppercaseStream(channel(words())))) { 24 24 results.push(word); 25 25 } 26 26 assert.deepEqual(results, ['HELLO', 'WORLD']); ··· 29 29 } 30 30 }); 31 31 32 - it('accepts highWaterMark on stream()', async () => { 32 + it('accepts highWaterMark on channel()', async () => { 33 33 async function* numbers() { yield 1; yield 2; yield 3; } 34 34 const run = workers(1); 35 35 try { 36 - const result = await run(sumStream(stream(numbers(), { highWaterMark: 2 }))); 36 + const result = await run(sumStream(channel(numbers(), { highWaterMark: 2 }))); 37 37 assert.equal(result, 6); 38 38 } finally { 39 39 run[Symbol.dispose]();
+4 -4
test/stream-pipeline.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { mo, stream } from 'moroutine'; 3 + import { mo, channel } from 'moroutine'; 4 4 5 5 const generate = mo(import.meta, async function* (n: number) { 6 6 for (let i = 1; i <= n; i++) yield i; ··· 17 17 describe('streaming pipeline', () => { 18 18 it('chains two streaming moroutines', async () => { 19 19 const results: number[] = []; 20 - for await (const value of double(stream(generate(3)))) { 20 + for await (const value of double(channel(generate(3)))) { 21 21 results.push(value); 22 22 } 23 23 assert.deepEqual(results, [2, 4, 6]); ··· 25 25 26 26 it('chains three streaming moroutines', async () => { 27 27 const results: number[] = []; 28 - const doubled = double(stream(generate(3))); 29 - const squared = square(stream(doubled)); 28 + const doubled = double(channel(generate(3))); 29 + const squared = square(channel(doubled)); 30 30 for await (const value of squared) { 31 31 results.push(value); 32 32 }
-18
test/stream-wrapper.test.ts
··· 1 - import { describe, it } from 'node:test'; 2 - import assert from 'node:assert/strict'; 3 - 4 - describe('stream()', () => { 5 - it('returns an object with the stream marker', async () => { 6 - const { stream } = await import('../src/stream.ts'); 7 - async function* gen() { yield 1; } 8 - const wrapped = stream(gen()); 9 - assert.ok(Symbol.for('moroutine.stream') in (wrapped as any)); 10 - }); 11 - 12 - it('accepts highWaterMark option', async () => { 13 - const { stream } = await import('../src/stream.ts'); 14 - async function* gen() { yield 1; } 15 - const wrapped = stream(gen(), { highWaterMark: 32 }); 16 - assert.ok(wrapped); 17 - }); 18 - });