Offload functions to worker threads with shared memory primitives for Node.js.
1import { describe, it } from 'node:test';
2import assert from 'node:assert/strict';
3import { inert, map, workers } from 'moroutine';
4import { delayedSquare, toUpper, waitAborted } from './fixtures/map.ts';
5
6describe('map', () => {
7 it('dispatches tasks and yields results', async () => {
8 using run = workers(2);
9 async function* tasks() {
10 for (const n of [1, 2, 3, 4]) yield inert(delayedSquare(n, 10));
11 }
12 const results: number[] = [];
13 for await (const r of map(run, tasks(), { concurrency: 2 })) {
14 results.push(r);
15 }
16 assert.deepEqual(
17 results.sort((a, b) => a - b),
18 [1, 4, 9, 16],
19 );
20 });
21
22 it('respects the concurrency limit', async () => {
23 using run = workers(4);
24 let active = 0;
25 let peak = 0;
26 async function* tasks() {
27 for (let i = 0; i < 8; i++) {
28 active++;
29 peak = Math.max(peak, active);
30 yield inert(delayedSquare(i, 30));
31 active--;
32 }
33 }
34 const results: number[] = [];
35 for await (const r of map(run, tasks(), { concurrency: 2 })) {
36 results.push(r);
37 }
38 assert.equal(results.length, 8);
39 assert.ok(peak <= 2, `expected peak in-flight <= 2, got ${peak}`);
40 });
41
42 it('yields the union of result types for mixed tasks', async () => {
43 using run = workers(2);
44 async function* tasks() {
45 yield inert(delayedSquare(3, 5));
46 yield inert(toUpper('hi'));
47 yield inert(delayedSquare(4, 5));
48 yield inert(toUpper('there'));
49 }
50 const results: Array<string | number> = [];
51 for await (const r of map(run, tasks(), { concurrency: 2 })) {
52 results.push(r);
53 }
54 assert.equal(results.length, 4);
55 assert.equal(results.filter((r) => typeof r === 'number').length, 2);
56 assert.equal(results.filter((r) => typeof r === 'string').length, 2);
57 });
58
59 it('stops iteration when signal aborts', async () => {
60 using run = workers(2);
61 const ac = new AbortController();
62 async function* tasks() {
63 for (let i = 0; i < 100; i++) yield inert(waitAborted(i, ac.signal));
64 }
65 setTimeout(() => ac.abort(), 30);
66 const results: number[] = [];
67 await assert.rejects(
68 async () => {
69 for await (const r of map(run, tasks(), { concurrency: 4, signal: ac.signal })) {
70 results.push(r);
71 }
72 },
73 (err: Error) => err.name === 'AbortError',
74 );
75 });
76
77 it('accepts a sync iterable of tasks', async () => {
78 using run = workers(2);
79 const tasks = [1, 2, 3].map((n) => inert(delayedSquare(n, 5)));
80 const results: number[] = [];
81 for await (const r of map(run, tasks, { concurrency: 2 })) {
82 results.push(r);
83 }
84 assert.deepEqual(
85 results.sort((a, b) => a - b),
86 [1, 4, 9],
87 );
88 });
89
90 it('accepts a sync generator of tasks', async () => {
91 using run = workers(2);
92 function* gen() {
93 for (const n of [2, 4, 6]) yield inert(delayedSquare(n, 5));
94 }
95 const results: number[] = [];
96 for await (const r of map(run, gen(), { concurrency: 2 })) {
97 results.push(r);
98 }
99 assert.deepEqual(
100 results.sort((a, b) => a - b),
101 [4, 16, 36],
102 );
103 });
104
105 it('defaults concurrency to 1', async () => {
106 using run = workers(2);
107 const order: number[] = [];
108 async function* tasks() {
109 for (const n of [0, 1, 2]) {
110 order.push(n);
111 yield inert(delayedSquare(n, 20));
112 }
113 }
114 const results: number[] = [];
115 for await (const r of map(run, tasks())) {
116 results.push(r);
117 }
118 // With concurrency=1, generator shouldn't get ahead of consumption — all 3 produced sequentially
119 assert.deepEqual(results, [0, 1, 4]);
120 assert.deepEqual(order, [0, 1, 2]);
121 });
122});