Offload functions to worker threads with shared memory primitives for Node.js.
1import { describe, it } from 'node:test';
2import assert from 'node:assert/strict';
3import { workers, channel, int32atomic } from 'moroutine';
4import { sumStream, uppercaseStream } from './fixtures/stream-input.ts';
5import { slowSumPeakGap } from './fixtures/backpressure.ts';
6
7describe('channel() input (main -> worker)', () => {
8 it('pipes async iterable to worker', async () => {
9 async function* numbers() {
10 yield 1;
11 yield 2;
12 yield 3;
13 }
14 const run = workers(1);
15 try {
16 const result = await run(sumStream(channel(numbers())));
17 assert.equal(result, 6);
18 } finally {
19 run[Symbol.dispose]();
20 }
21 });
22
23 it('bidirectional: channel input + generator output', async () => {
24 async function* words() {
25 yield 'hello';
26 yield 'world';
27 }
28 const run = workers(1);
29 try {
30 const results: string[] = [];
31 for await (const word of run(uppercaseStream(channel(words())))) {
32 results.push(word);
33 }
34 assert.deepEqual(results, ['HELLO', 'WORLD']);
35 } finally {
36 run[Symbol.dispose]();
37 }
38 });
39
40 it('accepts highWaterMark on channel()', async () => {
41 async function* numbers() {
42 yield 1;
43 yield 2;
44 yield 3;
45 }
46 const run = workers(1);
47 try {
48 const result = await run(sumStream(channel(numbers(), { highWaterMark: 2 })));
49 assert.equal(result, 6);
50 } finally {
51 run[Symbol.dispose]();
52 }
53 });
54
55 it('honors channel highWaterMark: parent producer stalls for slow worker', async () => {
56 using run = workers(1);
57 const highWaterMark = 4;
58 const emitted = int32atomic();
59 async function* source() {
60 for (let i = 0; i < 50; i++) {
61 yield i;
62 emitted.add(1);
63 }
64 }
65 const peak = await run(slowSumPeakGap(channel(source(), { highWaterMark }), emitted, 2));
66 assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`);
67 });
68});