import { describe, it } from 'node:test'; import { setTimeout as sleep } from 'node:timers/promises'; import assert from 'node:assert/strict'; import { workers, int32atomic } from 'moroutine'; import { countUp, failAfter } from './fixtures/stream-gen.ts'; import { countingGen } from './fixtures/backpressure.ts'; describe('streaming moroutines (worker -> main)', () => { it('iterates yielded values from worker', async () => { const run = workers(1); try { const results: number[] = []; for await (const value of run(countUp(5))) { results.push(value); } assert.deepEqual(results, [0, 1, 2, 3, 4]); } finally { run[Symbol.dispose](); } }); it('supports early break', async () => { const run = workers(1); try { const results: number[] = []; for await (const value of run(countUp(100))) { results.push(value); if (results.length >= 3) break; } assert.deepEqual(results, [0, 1, 2]); } finally { run[Symbol.dispose](); } }); it('propagates errors from worker generator', async () => { const run = workers(1); try { const results: number[] = []; await assert.rejects( async () => { for await (const value of run(failAfter(3))) { results.push(value); } }, { message: 'intentional error' }, ); assert.deepEqual(results, [0, 1, 2]); } finally { run[Symbol.dispose](); } }); it('works on dedicated worker without pool', async () => { const results: number[] = []; for await (const value of countUp(3)) { results.push(value); } assert.deepEqual(results, [0, 1, 2]); }); it('accepts highWaterMark option', async () => { const run = workers(1); try { const results: number[] = []; for await (const value of run(countUp(5), { highWaterMark: 2 })) { results.push(value); } assert.deepEqual(results, [0, 1, 2, 3, 4]); } finally { run[Symbol.dispose](); } }); it('honors highWaterMark: slow consumer stalls the producer', async () => { using run = workers(1); const highWaterMark = 4; const emitted = int32atomic(); let consumed = 0; let peak = 0; for await (const _ of run(countingGen(50, emitted), { highWaterMark })) { consumed++; const gap = emitted.load() - consumed; if (gap > peak) peak = gap; await sleep(2); } // With highWater=4, worst-case observable gap is ~highWater + 2 slop. assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`); }); });