Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: stream() wrapper for async iterable args

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+44
+2
src/index.ts
··· 2 2 export type { Arg } from './mo.ts'; 3 3 export { Task } from './task.ts'; 4 4 export { StreamTask } from './stream-task.ts'; 5 + export { stream } from './stream.ts'; 6 + export type { StreamOptions } from './stream.ts'; 5 7 export { workers } from './worker-pool.ts'; 6 8 export { transfer } from './transfer.ts'; 7 9 export type { Runner } from './runner.ts';
+24
src/stream.ts
··· 1 + const STREAM = Symbol.for('moroutine.stream'); 2 + 3 + export interface StreamOptions { 4 + highWaterMark?: number; 5 + } 6 + 7 + export interface StreamMarker<T> { 8 + readonly [STREAM]: { 9 + iterable: AsyncIterable<T>; 10 + options?: StreamOptions; 11 + }; 12 + } 13 + 14 + /** 15 + * Wraps an AsyncIterable for streaming to a worker. 16 + * @param iterable - The async iterable to pipe to the worker. 17 + * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). 18 + * @returns The iterable, typed as AsyncIterable<T> for transparent moroutine arg use. 19 + */ 20 + export function stream<T>(iterable: AsyncIterable<T>, opts?: StreamOptions): AsyncIterable<T> { 21 + return { [STREAM]: { iterable, options: opts } } as unknown as AsyncIterable<T>; 22 + } 23 + 24 + export { STREAM };
+18
test/stream-wrapper.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + 4 + describe('stream()', () => { 5 + it('returns an object with the stream marker', async () => { 6 + const { stream } = await import('../src/stream.ts'); 7 + async function* gen() { yield 1; } 8 + const wrapped = stream(gen()); 9 + assert.ok(Symbol.for('moroutine.stream') in (wrapped as any)); 10 + }); 11 + 12 + it('accepts highWaterMark option', async () => { 13 + const { stream } = await import('../src/stream.ts'); 14 + async function* gen() { yield 1; } 15 + const wrapped = stream(gen(), { highWaterMark: 32 }); 16 + assert.ok(wrapped); 17 + }); 18 + });