import { describe, it } from 'node:test'; import assert from 'node:assert/strict'; import { workers, channel, int32atomic } from 'moroutine'; import { sumStream, uppercaseStream } from './fixtures/stream-input.ts'; import { slowSumPeakGap } from './fixtures/backpressure.ts'; describe('channel() input (main -> worker)', () => { it('pipes async iterable to worker', async () => { async function* numbers() { yield 1; yield 2; yield 3; } const run = workers(1); try { const result = await run(sumStream(channel(numbers()))); assert.equal(result, 6); } finally { run[Symbol.dispose](); } }); it('bidirectional: channel input + generator output', async () => { async function* words() { yield 'hello'; yield 'world'; } const run = workers(1); try { const results: string[] = []; for await (const word of run(uppercaseStream(channel(words())))) { results.push(word); } assert.deepEqual(results, ['HELLO', 'WORLD']); } finally { run[Symbol.dispose](); } }); it('accepts highWaterMark on channel()', async () => { async function* numbers() { yield 1; yield 2; yield 3; } const run = workers(1); try { const result = await run(sumStream(channel(numbers(), { highWaterMark: 2 }))); assert.equal(result, 6); } finally { run[Symbol.dispose](); } }); it('honors channel highWaterMark: parent producer stalls for slow worker', async () => { using run = workers(1); const highWaterMark = 4; const emitted = int32atomic(); async function* source() { for (let i = 0; i < 50; i++) { yield i; emitted.add(1); } } const peak = await run(slowSumPeakGap(channel(source(), { highWaterMark }), emitted, 2)); assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`); }); });