Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Runner type and workerPool implementation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Devin Ivy f1668293 e39be174

+93
+2
src/index.ts
··· 1 1 export { mo } from './mo.ts'; 2 2 export { Task } from './task.ts'; 3 + export { workerPool } from './worker-pool.ts'; 4 + export type { Runner } from './runner.ts';
+6
src/runner.ts
··· 1 + import type { Task } from './task.ts'; 2 + 3 + export type Runner = { 4 + <T>(task: Task<T>): Promise<T>; 5 + [Symbol.dispose](): void; 6 + };
+36
src/worker-pool.ts
··· 1 + import { Worker } from 'node:worker_threads'; 2 + import { setupWorker, execute } from './execute.ts'; 3 + import type { Task } from './task.ts'; 4 + import type { Runner } from './runner.ts'; 5 + 6 + const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 7 + 8 + export function workerPool(size: number): Runner { 9 + const workers: Worker[] = []; 10 + for (let i = 0; i < size; i++) { 11 + const worker = new Worker(workerEntryUrl); 12 + worker.unref(); 13 + setupWorker(worker); 14 + workers.push(worker); 15 + } 16 + 17 + let next = 0; 18 + let disposed = false; 19 + 20 + const run = <T>(task: Task<T>): Promise<T> => { 21 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 22 + const worker = workers[next % workers.length]; 23 + next++; 24 + return execute<T>(worker, task.id, task.args); 25 + }; 26 + 27 + run[Symbol.dispose] = () => { 28 + disposed = true; 29 + for (const worker of workers) { 30 + worker.terminate(); 31 + } 32 + workers.length = 0; 33 + }; 34 + 35 + return run as Runner; 36 + }
+49
test/pool.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workerPool } from 'moroutine'; 4 + import { double, add } from './fixtures/math.ts'; 5 + 6 + describe('workerPool', () => { 7 + it('executes a moroutine through the pool', async () => { 8 + const run = workerPool(2); 9 + try { 10 + const result = await run(double(2)); 11 + assert.equal(result, 4); 12 + } finally { 13 + run[Symbol.dispose](); 14 + } 15 + }); 16 + 17 + it('handles concurrent calls across pool workers', async () => { 18 + const run = workerPool(2); 19 + try { 20 + const results = await Promise.all([ 21 + run(double(1)), 22 + run(double(2)), 23 + run(double(3)), 24 + run(double(4)), 25 + ]); 26 + assert.deepEqual(results, [2, 4, 6, 8]); 27 + } finally { 28 + run[Symbol.dispose](); 29 + } 30 + }); 31 + 32 + it('handles multiple argument moroutines', async () => { 33 + const run = workerPool(1); 34 + try { 35 + const result = await run(add(10, 20)); 36 + assert.equal(result, 30); 37 + } finally { 38 + run[Symbol.dispose](); 39 + } 40 + }); 41 + 42 + it('dispose terminates pool workers', async () => { 43 + const run = workerPool(2); 44 + await run(double(1)); 45 + run[Symbol.dispose](); 46 + // After dispose, calls should fail 47 + await assert.rejects(() => run(double(1))); 48 + }); 49 + });