Offload functions to worker threads with shared memory primitives for Node.js.
1import { describe, it } from 'node:test';
2import assert from 'node:assert/strict';
3import { workers } from 'moroutine';
4import { countUp, failAfter } from './fixtures/stream-gen.ts';
5
6describe('streaming moroutines (worker -> main)', () => {
7 it('iterates yielded values from worker', async () => {
8 const run = workers(1);
9 try {
10 const results: number[] = [];
11 for await (const value of run(countUp(5))) {
12 results.push(value);
13 }
14 assert.deepEqual(results, [0, 1, 2, 3, 4]);
15 } finally {
16 run[Symbol.dispose]();
17 }
18 });
19
20 it('supports early break', async () => {
21 const run = workers(1);
22 try {
23 const results: number[] = [];
24 for await (const value of run(countUp(100))) {
25 results.push(value);
26 if (results.length >= 3) break;
27 }
28 assert.deepEqual(results, [0, 1, 2]);
29 } finally {
30 run[Symbol.dispose]();
31 }
32 });
33
34 it('propagates errors from worker generator', async () => {
35 const run = workers(1);
36 try {
37 const results: number[] = [];
38 await assert.rejects(
39 async () => {
40 for await (const value of run(failAfter(3))) {
41 results.push(value);
42 }
43 },
44 { message: 'intentional error' },
45 );
46 assert.deepEqual(results, [0, 1, 2]);
47 } finally {
48 run[Symbol.dispose]();
49 }
50 });
51
52 it('works on dedicated worker without pool', async () => {
53 const results: number[] = [];
54 for await (const value of countUp(3)) {
55 results.push(value);
56 }
57 assert.deepEqual(results, [0, 1, 2]);
58 });
59
60 it('accepts highWaterMark option', async () => {
61 const run = workers(1);
62 try {
63 const results: number[] = [];
64 for await (const value of run(countUp(5), { highWaterMark: 2 })) {
65 results.push(value);
66 }
67 assert.deepEqual(results, [0, 1, 2, 3, 4]);
68 } finally {
69 run[Symbol.dispose]();
70 }
71 });
72});