import { describe, it } from 'node:test'; import assert from 'node:assert/strict'; import { workers } from 'moroutine'; import { countUp, failAfter } from './fixtures/stream-gen.ts'; describe('streaming moroutines (worker -> main)', () => { it('iterates yielded values from worker', async () => { const run = workers(1); try { const results: number[] = []; for await (const value of run(countUp(5))) { results.push(value); } assert.deepEqual(results, [0, 1, 2, 3, 4]); } finally { run[Symbol.dispose](); } }); it('supports early break', async () => { const run = workers(1); try { const results: number[] = []; for await (const value of run(countUp(100))) { results.push(value); if (results.length >= 3) break; } assert.deepEqual(results, [0, 1, 2]); } finally { run[Symbol.dispose](); } }); it('propagates errors from worker generator', async () => { const run = workers(1); try { const results: number[] = []; await assert.rejects( async () => { for await (const value of run(failAfter(3))) { results.push(value); } }, { message: 'intentional error' }, ); assert.deepEqual(results, [0, 1, 2]); } finally { run[Symbol.dispose](); } }); it('works on dedicated worker without pool', async () => { const results: number[] = []; for await (const value of countUp(3)) { results.push(value); } assert.deepEqual(results, [0, 1, 2]); }); it('accepts highWaterMark option', async () => { const run = workers(1); try { const results: number[] = []; for await (const value of run(countUp(5), { highWaterMark: 2 })) { results.push(value); } assert.deepEqual(results, [0, 1, 2, 3, 4]); } finally { run[Symbol.dispose](); } }); });