Offload functions to worker threads with shared memory primitives for Node.js.
1import { describe, it } from 'node:test';
2import { setTimeout as sleep } from 'node:timers/promises';
3import assert from 'node:assert/strict';
4import { workers, int32atomic } from 'moroutine';
5import { countUp, failAfter } from './fixtures/stream-gen.ts';
6import { countingGen } from './fixtures/backpressure.ts';
7
8describe('streaming moroutines (worker -> main)', () => {
9 it('iterates yielded values from worker', async () => {
10 const run = workers(1);
11 try {
12 const results: number[] = [];
13 for await (const value of run(countUp(5))) {
14 results.push(value);
15 }
16 assert.deepEqual(results, [0, 1, 2, 3, 4]);
17 } finally {
18 run[Symbol.dispose]();
19 }
20 });
21
22 it('supports early break', async () => {
23 const run = workers(1);
24 try {
25 const results: number[] = [];
26 for await (const value of run(countUp(100))) {
27 results.push(value);
28 if (results.length >= 3) break;
29 }
30 assert.deepEqual(results, [0, 1, 2]);
31 } finally {
32 run[Symbol.dispose]();
33 }
34 });
35
36 it('propagates errors from worker generator', async () => {
37 const run = workers(1);
38 try {
39 const results: number[] = [];
40 await assert.rejects(
41 async () => {
42 for await (const value of run(failAfter(3))) {
43 results.push(value);
44 }
45 },
46 { message: 'intentional error' },
47 );
48 assert.deepEqual(results, [0, 1, 2]);
49 } finally {
50 run[Symbol.dispose]();
51 }
52 });
53
54 it('works on dedicated worker without pool', async () => {
55 const results: number[] = [];
56 for await (const value of countUp(3)) {
57 results.push(value);
58 }
59 assert.deepEqual(results, [0, 1, 2]);
60 });
61
62 it('accepts highWaterMark option', async () => {
63 const run = workers(1);
64 try {
65 const results: number[] = [];
66 for await (const value of run(countUp(5), { highWaterMark: 2 })) {
67 results.push(value);
68 }
69 assert.deepEqual(results, [0, 1, 2, 3, 4]);
70 } finally {
71 run[Symbol.dispose]();
72 }
73 });
74
75 it('honors highWaterMark: slow consumer stalls the producer', async () => {
76 using run = workers(1);
77 const highWaterMark = 4;
78 const emitted = int32atomic();
79 let consumed = 0;
80 let peak = 0;
81 for await (const _ of run(countingGen(50, emitted), { highWaterMark })) {
82 consumed++;
83 const gap = emitted.load() - consumed;
84 if (gap > peak) peak = gap;
85 await sleep(2);
86 }
87 // With highWater=4, worst-case observable gap is ~highWater + 2 slop.
88 assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`);
89 });
90});