import { describe, it } from 'node:test'; import assert from 'node:assert/strict'; import { inert, map, workers } from 'moroutine'; import { delayedSquare, toUpper, waitAborted } from './fixtures/map.ts'; describe('map', () => { it('dispatches tasks and yields results', async () => { using run = workers(2); async function* tasks() { for (const n of [1, 2, 3, 4]) yield inert(delayedSquare(n, 10)); } const results: number[] = []; for await (const r of map(run, tasks(), { concurrency: 2 })) { results.push(r); } assert.deepEqual( results.sort((a, b) => a - b), [1, 4, 9, 16], ); }); it('respects the concurrency limit', async () => { using run = workers(4); let active = 0; let peak = 0; async function* tasks() { for (let i = 0; i < 8; i++) { active++; peak = Math.max(peak, active); yield inert(delayedSquare(i, 30)); active--; } } const results: number[] = []; for await (const r of map(run, tasks(), { concurrency: 2 })) { results.push(r); } assert.equal(results.length, 8); assert.ok(peak <= 2, `expected peak in-flight <= 2, got ${peak}`); }); it('yields the union of result types for mixed tasks', async () => { using run = workers(2); async function* tasks() { yield inert(delayedSquare(3, 5)); yield inert(toUpper('hi')); yield inert(delayedSquare(4, 5)); yield inert(toUpper('there')); } const results: Array = []; for await (const r of map(run, tasks(), { concurrency: 2 })) { results.push(r); } assert.equal(results.length, 4); assert.equal(results.filter((r) => typeof r === 'number').length, 2); assert.equal(results.filter((r) => typeof r === 'string').length, 2); }); it('stops iteration when signal aborts', async () => { using run = workers(2); const ac = new AbortController(); async function* tasks() { for (let i = 0; i < 100; i++) yield inert(waitAborted(i, ac.signal)); } setTimeout(() => ac.abort(), 30); const results: number[] = []; await assert.rejects( async () => { for await (const r of map(run, tasks(), { concurrency: 4, signal: ac.signal })) { results.push(r); } }, (err: Error) => err.name === 'AbortError', ); }); it('accepts a sync iterable of tasks', async () => { using run = workers(2); const tasks = [1, 2, 3].map((n) => inert(delayedSquare(n, 5))); const results: number[] = []; for await (const r of map(run, tasks, { concurrency: 2 })) { results.push(r); } assert.deepEqual( results.sort((a, b) => a - b), [1, 4, 9], ); }); it('accepts a sync generator of tasks', async () => { using run = workers(2); function* gen() { for (const n of [2, 4, 6]) yield inert(delayedSquare(n, 5)); } const results: number[] = []; for await (const r of map(run, gen(), { concurrency: 2 })) { results.push(r); } assert.deepEqual( results.sort((a, b) => a - b), [4, 16, 36], ); }); it('defaults concurrency to 1', async () => { using run = workers(2); const order: number[] = []; async function* tasks() { for (const n of [0, 1, 2]) { order.push(n); yield inert(delayedSquare(n, 20)); } } const results: number[] = []; for await (const r of map(run, tasks())) { results.push(r); } // With concurrency=1, generator shouldn't get ahead of consumption — all 3 produced sequentially assert.deepEqual(results, [0, 1, 4]); assert.deepEqual(order, [0, 1, 2]); }); });