Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: rename pool() to workers()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+38 -38
+2 -2
examples/atomics/main.ts
··· 3 3 // 4 4 // Run: node examples/atomics/main.ts 5 5 6 - import { pool, int32atomic } from '../../src/index.ts'; 6 + import { workers, int32atomic } from '../../src/index.ts'; 7 7 import { increment } from './increment.ts'; 8 8 9 9 const counter = int32atomic(); 10 10 11 11 { 12 - using run = pool(4); 12 + using run = workers(4); 13 13 14 14 // Fire off 100 increments across 4 workers 15 15 await Promise.all(Array.from({ length: 100 }, () => run(increment(counter))));
+2 -2
examples/non-blocking/main.ts
··· 4 4 // 5 5 // Run: node examples/non-blocking/main.ts 6 6 7 - import { pool } from '../../src/index.ts'; 7 + import { workers } from '../../src/index.ts'; 8 8 import { fibonacci } from './fibonacci.ts'; 9 9 10 10 // Tick a counter on the main thread to prove it's not blocked ··· 18 18 console.log('Meanwhile, the main thread keeps ticking:\n'); 19 19 20 20 { 21 - using run = pool(2); 21 + using run = workers(2); 22 22 const start = performance.now(); 23 23 const [a, b] = await Promise.all([run(fibonacci(42)), run(fibonacci(41))]); 24 24 const elapsed = (performance.now() - start).toFixed(0);
+2 -2
examples/parallel-batch/main.ts
··· 4 4 // 5 5 // Run: node examples/parallel-batch/main.ts 6 6 7 - import { pool } from '../../src/index.ts'; 7 + import { workers } from '../../src/index.ts'; 8 8 import { heavyWork } from './heavy-work.ts'; 9 9 10 10 const items = Array.from({ length: 20 }, (_, i) => i + 1); ··· 23 23 let parTime: string; 24 24 console.log('Parallel (worker pool, 4 workers)...'); 25 25 { 26 - using run = pool(4); 26 + using run = workers(4); 27 27 const parStart = performance.now(); 28 28 const parResults = await Promise.all(items.map((item) => run(heavyWork(item)))); 29 29 parTime = (performance.now() - parStart).toFixed(0);
+2 -2
examples/shared-state/main.ts
··· 4 4 // 5 5 // Run: node examples/shared-state/main.ts 6 6 7 - import { pool, shared, int32, mutex } from '../../src/index.ts'; 7 + import { workers, shared, int32, mutex } from '../../src/index.ts'; 8 8 import { updatePosition } from './update-position.ts'; 9 9 10 10 const lock = mutex(); ··· 13 13 const steps = 1000; 14 14 15 15 { 16 - using run = pool(4); 16 + using run = workers(4); 17 17 18 18 // 4 workers each move the position (1, 2) per step, 1000 steps each 19 19 await Promise.all([
+1 -1
src/index.ts
··· 1 1 export { mo } from './mo.ts'; 2 2 export { Task } from './task.ts'; 3 - export { pool } from './worker-pool.ts'; 3 + export { workers } from './worker-pool.ts'; 4 4 export { transfer } from './transfer.ts'; 5 5 export type { Runner } from './runner.ts'; 6 6 export {
+6 -6
src/worker-pool.ts
··· 5 5 6 6 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 7 7 8 - export function pool(size: number): Runner { 9 - const workers: Worker[] = []; 8 + export function workers(size: number): Runner { 9 + const pool: Worker[] = []; 10 10 for (let i = 0; i < size; i++) { 11 11 const worker = new Worker(workerEntryUrl); 12 12 worker.unref(); 13 13 setupWorker(worker); 14 - workers.push(worker); 14 + pool.push(worker); 15 15 } 16 16 17 17 let next = 0; ··· 20 20 const run: Runner = Object.assign( 21 21 <T>(task: Task<T>): Promise<T> => { 22 22 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 23 - const worker = workers[next % workers.length]; 23 + const worker = pool[next % pool.length]; 24 24 next++; 25 25 return execute<T>(worker, task.id, task.args); 26 26 }, 27 27 { 28 28 [Symbol.dispose]() { 29 29 disposed = true; 30 - for (const worker of workers) { 30 + for (const worker of pool) { 31 31 worker.terminate(); 32 32 } 33 - workers.length = 0; 33 + pool.length = 0; 34 34 }, 35 35 }, 36 36 );
+2 -2
test/error.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { pool } from 'moroutine'; 3 + import { workers } from 'moroutine'; 4 4 import { fail } from './fixtures/math.ts'; 5 5 6 6 describe('error handling', () => { ··· 11 11 }); 12 12 13 13 it('rejects with error from pool worker', async () => { 14 - const run = pool(1); 14 + const run = workers(1); 15 15 try { 16 16 await assert.rejects(() => run(fail('pool boom')), { 17 17 message: 'pool boom',
+6 -6
test/pool.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { pool } from 'moroutine'; 3 + import { workers } from 'moroutine'; 4 4 import { double, add } from './fixtures/math.ts'; 5 5 6 - describe('pool', () => { 6 + describe('workers', () => { 7 7 it('executes a moroutine through the pool', async () => { 8 - const run = pool(2); 8 + const run = workers(2); 9 9 try { 10 10 const result = await run(double(2)); 11 11 assert.equal(result, 4); ··· 15 15 }); 16 16 17 17 it('handles concurrent calls across pool workers', async () => { 18 - const run = pool(2); 18 + const run = workers(2); 19 19 try { 20 20 const results = await Promise.all([run(double(1)), run(double(2)), run(double(3)), run(double(4))]); 21 21 assert.deepEqual(results, [2, 4, 6, 8]); ··· 25 25 }); 26 26 27 27 it('handles multiple argument moroutines', async () => { 28 - const run = pool(1); 28 + const run = workers(1); 29 29 try { 30 30 const result = await run(add(10, 20)); 31 31 assert.equal(result, 30); ··· 35 35 }); 36 36 37 37 it('dispose terminates pool workers', async () => { 38 - const run = pool(2); 38 + const run = workers(2); 39 39 await run(double(1)); 40 40 run[Symbol.dispose](); 41 41 // After dispose, calls should fail
+10 -10
test/shared/cross-worker.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { pool, int32atomic, mutex, rwlock, shared, int32, string, bytes } from 'moroutine'; 3 + import { workers, int32atomic, mutex, rwlock, shared, int32, string, bytes } from 'moroutine'; 4 4 import { atomicAdd, mutexIncrement, rwlockRead } from '../fixtures/sync.ts'; 5 5 import { readValue, writeValue } from '../fixtures/shared-new.ts'; 6 6 7 7 describe('shared primitives across workers', () => { 8 8 it('Int32Atomic is shared across worker threads', async () => { 9 9 const counter = int32atomic(); 10 - const run = pool(2); 10 + const run = workers(2); 11 11 try { 12 12 await Promise.all([run(atomicAdd(counter, 10)), run(atomicAdd(counter, 20))]); 13 13 assert.equal(counter.load(), 30); ··· 19 19 it('Mutex serializes access across workers', async () => { 20 20 const m = mutex(); 21 21 const counter = int32atomic(); 22 - const run = pool(2); 22 + const run = workers(2); 23 23 try { 24 24 await Promise.all([run(mutexIncrement(m, counter, 100)), run(mutexIncrement(m, counter, 100))]); 25 25 assert.equal(counter.load(), 200); ··· 32 32 const rw = rwlock(); 33 33 const counter = int32atomic(); 34 34 counter.store(42); 35 - const run = pool(2); 35 + const run = workers(2); 36 36 try { 37 37 const [a, b] = await Promise.all([run(rwlockRead(rw, counter)), run(rwlockRead(rw, counter))]); 38 38 assert.equal(a, 42); ··· 46 46 const point = shared({ x: int32, y: int32 }); 47 47 point.store({ x: 1, y: 2 }); 48 48 49 - const run = pool(1); 49 + const run = workers(1); 50 50 try { 51 51 const result = await run(readValue(point)); 52 52 assert.deepEqual(result, { x: 1, y: 2 }); ··· 58 58 it('worker can write to struct via shared()', async () => { 59 59 const point = shared({ x: int32, y: int32 }); 60 60 61 - const run = pool(1); 61 + const run = workers(1); 62 62 try { 63 63 await run(writeValue(point, { x: 10, y: 20 })); 64 64 assert.deepEqual(point.load(), { x: 10, y: 20 }); ··· 71 71 const t = shared([int32, int32]); 72 72 t.store([1, 2]); 73 73 74 - const run = pool(1); 74 + const run = workers(1); 75 75 try { 76 76 const result = await run(readValue(t)); 77 77 assert.deepEqual(result, [1, 2]); ··· 84 84 const s = shared(string(32)); 85 85 s.store('hello'); 86 86 87 - const run = pool(1); 87 + const run = workers(1); 88 88 try { 89 89 const result = await run(readValue(s)); 90 90 assert.equal(result, 'hello'); ··· 97 97 const b = shared(bytes(4)); 98 98 b.store(new Uint8Array([1, 2, 3, 4])); 99 99 100 - const run = pool(1); 100 + const run = workers(1); 101 101 try { 102 102 const result = (await run(readValue(b))) as Uint8Array; 103 103 assert.deepEqual([...result], [1, 2, 3, 4]); ··· 110 110 const entity = shared({ name: string(16), hp: int32 }); 111 111 entity.store({ name: 'goblin', hp: 50 }); 112 112 113 - const run = pool(1); 113 + const run = workers(1); 114 114 try { 115 115 const result = await run(readValue(entity)); 116 116 assert.deepEqual(result, { name: 'goblin', hp: 50 });
+5 -5
test/transfer.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { pool, transfer } from 'moroutine'; 3 + import { workers, transfer } from 'moroutine'; 4 4 import { sumBuffer, sumUint8, makeBuffer } from './fixtures/transfer.ts'; 5 5 6 6 describe('transfer', () => { ··· 12 12 view[2] = 3; 13 13 view[3] = 4; 14 14 15 - const run = pool(1); 15 + const run = workers(1); 16 16 try { 17 17 const sum = await run(sumBuffer(transfer(buf))); 18 18 assert.equal(sum, 10); ··· 27 27 const arr = new Uint8Array([5, 10, 15, 20]); 28 28 const originalBuffer = arr.buffer; 29 29 30 - const run = pool(1); 30 + const run = workers(1); 31 31 try { 32 32 const sum = await run(sumUint8(transfer(arr))); 33 33 assert.equal(sum, 50); ··· 39 39 }); 40 40 41 41 it('auto-transfers return values from worker (zero-copy)', async () => { 42 - const run = pool(1); 42 + const run = workers(1); 43 43 try { 44 44 const buf = await run(makeBuffer(4)); 45 45 const view = new Uint8Array(buf); ··· 58 58 view[2] = 3; 59 59 view[3] = 4; 60 60 61 - const run = pool(1); 61 + const run = workers(1); 62 62 try { 63 63 const sum = await run(sumBuffer(buf)); 64 64 assert.equal(sum, 10);