Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 90 lines 2.7 kB view raw
1import { describe, it } from 'node:test'; 2import { setTimeout as sleep } from 'node:timers/promises'; 3import assert from 'node:assert/strict'; 4import { workers, int32atomic } from 'moroutine'; 5import { countUp, failAfter } from './fixtures/stream-gen.ts'; 6import { countingGen } from './fixtures/backpressure.ts'; 7 8describe('streaming moroutines (worker -> main)', () => { 9 it('iterates yielded values from worker', async () => { 10 const run = workers(1); 11 try { 12 const results: number[] = []; 13 for await (const value of run(countUp(5))) { 14 results.push(value); 15 } 16 assert.deepEqual(results, [0, 1, 2, 3, 4]); 17 } finally { 18 run[Symbol.dispose](); 19 } 20 }); 21 22 it('supports early break', async () => { 23 const run = workers(1); 24 try { 25 const results: number[] = []; 26 for await (const value of run(countUp(100))) { 27 results.push(value); 28 if (results.length >= 3) break; 29 } 30 assert.deepEqual(results, [0, 1, 2]); 31 } finally { 32 run[Symbol.dispose](); 33 } 34 }); 35 36 it('propagates errors from worker generator', async () => { 37 const run = workers(1); 38 try { 39 const results: number[] = []; 40 await assert.rejects( 41 async () => { 42 for await (const value of run(failAfter(3))) { 43 results.push(value); 44 } 45 }, 46 { message: 'intentional error' }, 47 ); 48 assert.deepEqual(results, [0, 1, 2]); 49 } finally { 50 run[Symbol.dispose](); 51 } 52 }); 53 54 it('works on dedicated worker without pool', async () => { 55 const results: number[] = []; 56 for await (const value of countUp(3)) { 57 results.push(value); 58 } 59 assert.deepEqual(results, [0, 1, 2]); 60 }); 61 62 it('accepts highWaterMark option', async () => { 63 const run = workers(1); 64 try { 65 const results: number[] = []; 66 for await (const value of run(countUp(5), { highWaterMark: 2 })) { 67 results.push(value); 68 } 69 assert.deepEqual(results, [0, 1, 2, 3, 4]); 70 } finally { 71 run[Symbol.dispose](); 72 } 73 }); 74 75 it('honors highWaterMark: slow consumer stalls the producer', async () => { 76 using run = workers(1); 77 const highWaterMark = 4; 78 const emitted = int32atomic(); 79 let consumed = 0; 80 let peak = 0; 81 for await (const _ of run(countingGen(50, emitted), { highWaterMark })) { 82 consumed++; 83 const gap = emitted.load() - consumed; 84 if (gap > peak) peak = gap; 85 await sleep(2); 86 } 87 // With highWater=4, worst-case observable gap is ~highWater + 2 slop. 88 assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`); 89 }); 90});