Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e2cebd539403475e2f4e79d55ca1a84a0ce3510d 122 lines 3.7 kB view raw
1import { describe, it } from 'node:test'; 2import assert from 'node:assert/strict'; 3import { inert, map, workers } from 'moroutine'; 4import { delayedSquare, toUpper, waitAborted } from './fixtures/map.ts'; 5 6describe('map', () => { 7 it('dispatches tasks and yields results', async () => { 8 using run = workers(2); 9 async function* tasks() { 10 for (const n of [1, 2, 3, 4]) yield inert(delayedSquare(n, 10)); 11 } 12 const results: number[] = []; 13 for await (const r of map(run, tasks(), { concurrency: 2 })) { 14 results.push(r); 15 } 16 assert.deepEqual( 17 results.sort((a, b) => a - b), 18 [1, 4, 9, 16], 19 ); 20 }); 21 22 it('respects the concurrency limit', async () => { 23 using run = workers(4); 24 let active = 0; 25 let peak = 0; 26 async function* tasks() { 27 for (let i = 0; i < 8; i++) { 28 active++; 29 peak = Math.max(peak, active); 30 yield inert(delayedSquare(i, 30)); 31 active--; 32 } 33 } 34 const results: number[] = []; 35 for await (const r of map(run, tasks(), { concurrency: 2 })) { 36 results.push(r); 37 } 38 assert.equal(results.length, 8); 39 assert.ok(peak <= 2, `expected peak in-flight <= 2, got ${peak}`); 40 }); 41 42 it('yields the union of result types for mixed tasks', async () => { 43 using run = workers(2); 44 async function* tasks() { 45 yield inert(delayedSquare(3, 5)); 46 yield inert(toUpper('hi')); 47 yield inert(delayedSquare(4, 5)); 48 yield inert(toUpper('there')); 49 } 50 const results: Array<string | number> = []; 51 for await (const r of map(run, tasks(), { concurrency: 2 })) { 52 results.push(r); 53 } 54 assert.equal(results.length, 4); 55 assert.equal(results.filter((r) => typeof r === 'number').length, 2); 56 assert.equal(results.filter((r) => typeof r === 'string').length, 2); 57 }); 58 59 it('stops iteration when signal aborts', async () => { 60 using run = workers(2); 61 const ac = new AbortController(); 62 async function* tasks() { 63 for (let i = 0; i < 100; i++) yield inert(waitAborted(i, ac.signal)); 64 } 65 setTimeout(() => ac.abort(), 30); 66 const results: number[] = []; 67 await assert.rejects( 68 async () => { 69 for await (const r of map(run, tasks(), { concurrency: 4, signal: ac.signal })) { 70 results.push(r); 71 } 72 }, 73 (err: Error) => err.name === 'AbortError', 74 ); 75 }); 76 77 it('accepts a sync iterable of tasks', async () => { 78 using run = workers(2); 79 const tasks = [1, 2, 3].map((n) => inert(delayedSquare(n, 5))); 80 const results: number[] = []; 81 for await (const r of map(run, tasks, { concurrency: 2 })) { 82 results.push(r); 83 } 84 assert.deepEqual( 85 results.sort((a, b) => a - b), 86 [1, 4, 9], 87 ); 88 }); 89 90 it('accepts a sync generator of tasks', async () => { 91 using run = workers(2); 92 function* gen() { 93 for (const n of [2, 4, 6]) yield inert(delayedSquare(n, 5)); 94 } 95 const results: number[] = []; 96 for await (const r of map(run, gen(), { concurrency: 2 })) { 97 results.push(r); 98 } 99 assert.deepEqual( 100 results.sort((a, b) => a - b), 101 [4, 16, 36], 102 ); 103 }); 104 105 it('defaults concurrency to 1', async () => { 106 using run = workers(2); 107 const order: number[] = []; 108 async function* tasks() { 109 for (const n of [0, 1, 2]) { 110 order.push(n); 111 yield inert(delayedSquare(n, 20)); 112 } 113 } 114 const results: number[] = []; 115 for await (const r of map(run, tasks())) { 116 results.push(r); 117 } 118 // With concurrency=1, generator shouldn't get ahead of consumption — all 3 produced sequentially 119 assert.deepEqual(results, [0, 1, 4]); 120 assert.deepEqual(order, [0, 1, 2]); 121 }); 122});