Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: rename workerPool() to pool()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+63 -63
+4 -4
examples/atomics/main.ts
··· 2 2 // 3 3 // Run: node --experimental-strip-types examples/atomics/main.ts 4 4 5 - import { workerPool, int32atomic } from '../../src/index.ts'; 5 + import { pool, int32atomic } from '../../src/index.ts'; 6 6 import { increment } from './increment.ts'; 7 7 8 8 const counter = int32atomic(); 9 - const pool = workerPool(4); 9 + const run = pool(4); 10 10 11 11 // Fire off 100 increments across 4 workers 12 - await Promise.all(Array.from({ length: 100 }, () => pool(increment(counter)))); 12 + await Promise.all(Array.from({ length: 100 }, () => run(increment(counter)))); 13 13 14 - pool[Symbol.dispose](); 14 + run[Symbol.dispose](); 15 15 16 16 console.log(`Expected: 100`); 17 17 console.log(`Actual: ${counter.load()}`);
+4 -4
examples/non-blocking/main.ts
··· 3 3 // 4 4 // Run: node --experimental-strip-types examples/non-blocking/main.ts 5 5 6 - import { workerPool } from '../../src/index.ts'; 6 + import { pool } from '../../src/index.ts'; 7 7 import { fibonacci } from './fibonacci.ts'; 8 8 9 9 // Tick a counter on the main thread to prove it's not blocked ··· 16 16 console.log('Computing fibonacci(42) on a worker pool...'); 17 17 console.log('Meanwhile, the main thread keeps ticking:\n'); 18 18 19 - const pool = workerPool(2); 19 + const run = pool(2); 20 20 const start = performance.now(); 21 - const [a, b] = await Promise.all([pool(fibonacci(42)), pool(fibonacci(41))]); 21 + const [a, b] = await Promise.all([run(fibonacci(42)), run(fibonacci(41))]); 22 22 const elapsed = (performance.now() - start).toFixed(0); 23 - pool[Symbol.dispose](); 23 + run[Symbol.dispose](); 24 24 25 25 clearInterval(interval); 26 26 console.log(`\n\nResults: fib(42)=${a}, fib(41)=${b}`);
+4 -4
examples/parallel-batch/main.ts
··· 3 3 // 4 4 // Run: node --experimental-strip-types examples/parallel-batch/main.ts 5 5 6 - import { workerPool } from '../../src/index.ts'; 6 + import { pool } from '../../src/index.ts'; 7 7 import { heavyWork } from './heavy-work.ts'; 8 8 9 9 const items = Array.from({ length: 20 }, (_, i) => i + 1); ··· 20 20 21 21 // Parallel: distribute across a pool of 4 workers 22 22 console.log('Parallel (worker pool, 4 workers)...'); 23 - const pool = workerPool(4); 23 + const run = pool(4); 24 24 const parStart = performance.now(); 25 - const parResults = await Promise.all(items.map((item) => pool(heavyWork(item)))); 25 + const parResults = await Promise.all(items.map((item) => run(heavyWork(item)))); 26 26 const parTime = (performance.now() - parStart).toFixed(0); 27 - pool[Symbol.dispose](); 27 + run[Symbol.dispose](); 28 28 console.log(` ${items.length} items in ${parTime}ms\n`); 29 29 30 30 console.log(`Speedup: ~${(Number(seqTime) / Number(parTime)).toFixed(1)}x`);
+1 -1
src/index.ts
··· 1 1 export { mo } from './mo.ts'; 2 2 export { Task } from './task.ts'; 3 - export { workerPool } from './worker-pool.ts'; 3 + export { pool } from './worker-pool.ts'; 4 4 export { transfer } from './transfer.ts'; 5 5 export type { Runner } from './runner.ts'; 6 6 export {
+1 -1
src/worker-pool.ts
··· 5 5 6 6 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 7 7 8 - export function workerPool(size: number): Runner { 8 + export function pool(size: number): Runner { 9 9 const workers: Worker[] = []; 10 10 for (let i = 0; i < size; i++) { 11 11 const worker = new Worker(workerEntryUrl);
+2 -2
test/error.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workerPool } from 'moroutine'; 3 + import { pool } from 'moroutine'; 4 4 import { fail } from './fixtures/math.ts'; 5 5 6 6 describe('error handling', () => { ··· 11 11 }); 12 12 13 13 it('rejects with error from pool worker', async () => { 14 - const run = workerPool(1); 14 + const run = pool(1); 15 15 try { 16 16 await assert.rejects(() => run(fail('pool boom')), { 17 17 message: 'pool boom',
+6 -6
test/pool.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workerPool } from 'moroutine'; 3 + import { pool } from 'moroutine'; 4 4 import { double, add } from './fixtures/math.ts'; 5 5 6 - describe('workerPool', () => { 6 + describe('pool', () => { 7 7 it('executes a moroutine through the pool', async () => { 8 - const run = workerPool(2); 8 + const run = pool(2); 9 9 try { 10 10 const result = await run(double(2)); 11 11 assert.equal(result, 4); ··· 15 15 }); 16 16 17 17 it('handles concurrent calls across pool workers', async () => { 18 - const run = workerPool(2); 18 + const run = pool(2); 19 19 try { 20 20 const results = await Promise.all([run(double(1)), run(double(2)), run(double(3)), run(double(4))]); 21 21 assert.deepEqual(results, [2, 4, 6, 8]); ··· 25 25 }); 26 26 27 27 it('handles multiple argument moroutines', async () => { 28 - const run = workerPool(1); 28 + const run = pool(1); 29 29 try { 30 30 const result = await run(add(10, 20)); 31 31 assert.equal(result, 30); ··· 35 35 }); 36 36 37 37 it('dispose terminates pool workers', async () => { 38 - const run = workerPool(2); 38 + const run = pool(2); 39 39 await run(double(1)); 40 40 run[Symbol.dispose](); 41 41 // After dispose, calls should fail
+28 -28
test/shared/cross-worker.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workerPool, int32atomic, mutex, rwlock, shared, int32, string, bytes } from 'moroutine'; 3 + import { pool, int32atomic, mutex, rwlock, shared, int32, string, bytes } from 'moroutine'; 4 4 import { atomicAdd, mutexIncrement, rwlockRead } from '../fixtures/sync.ts'; 5 5 import { readValue, writeValue } from '../fixtures/shared-new.ts'; 6 6 7 7 describe('shared primitives across workers', () => { 8 8 it('Int32Atomic is shared across worker threads', async () => { 9 9 const counter = int32atomic(); 10 - const pool = workerPool(2); 10 + const run = pool(2); 11 11 try { 12 - await Promise.all([pool(atomicAdd(counter, 10)), pool(atomicAdd(counter, 20))]); 12 + await Promise.all([run(atomicAdd(counter, 10)), run(atomicAdd(counter, 20))]); 13 13 assert.equal(counter.load(), 30); 14 14 } finally { 15 - pool[Symbol.dispose](); 15 + run[Symbol.dispose](); 16 16 } 17 17 }); 18 18 19 19 it('Mutex serializes access across workers', async () => { 20 20 const m = mutex(); 21 21 const counter = int32atomic(); 22 - const pool = workerPool(2); 22 + const run = pool(2); 23 23 try { 24 - await Promise.all([pool(mutexIncrement(m, counter, 100)), pool(mutexIncrement(m, counter, 100))]); 24 + await Promise.all([run(mutexIncrement(m, counter, 100)), run(mutexIncrement(m, counter, 100))]); 25 25 assert.equal(counter.load(), 200); 26 26 } finally { 27 - pool[Symbol.dispose](); 27 + run[Symbol.dispose](); 28 28 } 29 29 }); 30 30 ··· 32 32 const rw = rwlock(); 33 33 const counter = int32atomic(); 34 34 counter.store(42); 35 - const pool = workerPool(2); 35 + const run = pool(2); 36 36 try { 37 - const [a, b] = await Promise.all([pool(rwlockRead(rw, counter)), pool(rwlockRead(rw, counter))]); 37 + const [a, b] = await Promise.all([run(rwlockRead(rw, counter)), run(rwlockRead(rw, counter))]); 38 38 assert.equal(a, 42); 39 39 assert.equal(b, 42); 40 40 } finally { 41 - pool[Symbol.dispose](); 41 + run[Symbol.dispose](); 42 42 } 43 43 }); 44 44 ··· 46 46 const point = shared({ x: int32, y: int32 }); 47 47 point.store({ x: 1, y: 2 }); 48 48 49 - const pool = workerPool(1); 49 + const run = pool(1); 50 50 try { 51 - const result = await pool(readValue(point)); 51 + const result = await run(readValue(point)); 52 52 assert.deepEqual(result, { x: 1, y: 2 }); 53 53 } finally { 54 - pool[Symbol.dispose](); 54 + run[Symbol.dispose](); 55 55 } 56 56 }); 57 57 58 58 it('worker can write to struct via shared()', async () => { 59 59 const point = shared({ x: int32, y: int32 }); 60 60 61 - const pool = workerPool(1); 61 + const run = pool(1); 62 62 try { 63 - await pool(writeValue(point, { x: 10, y: 20 })); 63 + await run(writeValue(point, { x: 10, y: 20 })); 64 64 assert.deepEqual(point.load(), { x: 10, y: 20 }); 65 65 } finally { 66 - pool[Symbol.dispose](); 66 + run[Symbol.dispose](); 67 67 } 68 68 }); 69 69 ··· 71 71 const t = shared([int32, int32]); 72 72 t.store([1, 2]); 73 73 74 - const pool = workerPool(1); 74 + const run = pool(1); 75 75 try { 76 - const result = await pool(readValue(t)); 76 + const result = await run(readValue(t)); 77 77 assert.deepEqual(result, [1, 2]); 78 78 } finally { 79 - pool[Symbol.dispose](); 79 + run[Symbol.dispose](); 80 80 } 81 81 }); 82 82 ··· 84 84 const s = shared(string(32)); 85 85 s.store('hello'); 86 86 87 - const pool = workerPool(1); 87 + const run = pool(1); 88 88 try { 89 - const result = await pool(readValue(s)); 89 + const result = await run(readValue(s)); 90 90 assert.equal(result, 'hello'); 91 91 } finally { 92 - pool[Symbol.dispose](); 92 + run[Symbol.dispose](); 93 93 } 94 94 }); 95 95 ··· 97 97 const b = shared(bytes(4)); 98 98 b.store(new Uint8Array([1, 2, 3, 4])); 99 99 100 - const pool = workerPool(1); 100 + const run = pool(1); 101 101 try { 102 - const result = (await pool(readValue(b))) as Uint8Array; 102 + const result = (await run(readValue(b))) as Uint8Array; 103 103 assert.deepEqual([...result], [1, 2, 3, 4]); 104 104 } finally { 105 - pool[Symbol.dispose](); 105 + run[Symbol.dispose](); 106 106 } 107 107 }); 108 108 ··· 110 110 const entity = shared({ name: string(16), hp: int32 }); 111 111 entity.store({ name: 'goblin', hp: 50 }); 112 112 113 - const pool = workerPool(1); 113 + const run = pool(1); 114 114 try { 115 - const result = await pool(readValue(entity)); 115 + const result = await run(readValue(entity)); 116 116 assert.deepEqual(result, { name: 'goblin', hp: 50 }); 117 117 } finally { 118 - pool[Symbol.dispose](); 118 + run[Symbol.dispose](); 119 119 } 120 120 }); 121 121 });
+13 -13
test/transfer.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workerPool, transfer } from 'moroutine'; 3 + import { pool, transfer } from 'moroutine'; 4 4 import { sumBuffer, sumUint8, makeBuffer } from './fixtures/transfer.ts'; 5 5 6 6 describe('transfer', () => { ··· 12 12 view[2] = 3; 13 13 view[3] = 4; 14 14 15 - const pool = workerPool(1); 15 + const run = pool(1); 16 16 try { 17 - const sum = await pool(sumBuffer(transfer(buf))); 17 + const sum = await run(sumBuffer(transfer(buf))); 18 18 assert.equal(sum, 10); 19 19 // Original buffer should be detached after transfer 20 20 assert.equal(buf.byteLength, 0); 21 21 } finally { 22 - pool[Symbol.dispose](); 22 + run[Symbol.dispose](); 23 23 } 24 24 }); 25 25 ··· 27 27 const arr = new Uint8Array([5, 10, 15, 20]); 28 28 const originalBuffer = arr.buffer; 29 29 30 - const pool = workerPool(1); 30 + const run = pool(1); 31 31 try { 32 - const sum = await pool(sumUint8(transfer(arr))); 32 + const sum = await run(sumUint8(transfer(arr))); 33 33 assert.equal(sum, 50); 34 34 // Underlying buffer should be detached after transfer 35 35 assert.equal(originalBuffer.byteLength, 0); 36 36 } finally { 37 - pool[Symbol.dispose](); 37 + run[Symbol.dispose](); 38 38 } 39 39 }); 40 40 41 41 it('auto-transfers return values from worker (zero-copy)', async () => { 42 - const pool = workerPool(1); 42 + const run = pool(1); 43 43 try { 44 - const buf = await pool(makeBuffer(4)); 44 + const buf = await run(makeBuffer(4)); 45 45 const view = new Uint8Array(buf); 46 46 assert.equal(buf.byteLength, 4); 47 47 assert.deepEqual([...view], [1, 2, 3, 4]); 48 48 } finally { 49 - pool[Symbol.dispose](); 49 + run[Symbol.dispose](); 50 50 } 51 51 }); 52 52 ··· 58 58 view[2] = 3; 59 59 view[3] = 4; 60 60 61 - const pool = workerPool(1); 61 + const run = pool(1); 62 62 try { 63 - const sum = await pool(sumBuffer(buf)); 63 + const sum = await run(sumBuffer(buf)); 64 64 assert.equal(sum, 10); 65 65 // Original buffer should NOT be detached (it was copied) 66 66 assert.equal(buf.byteLength, 4); 67 67 } finally { 68 - pool[Symbol.dispose](); 68 + run[Symbol.dispose](); 69 69 } 70 70 }); 71 71 });