Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: WorkerHandle, run.workers, and assign() dispatch support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+89 -6
+14 -6
src/worker-pool.ts
··· 2 2 import { Worker } from 'node:worker_threads'; 3 3 import { availableParallelism } from 'node:os'; 4 4 import { setupWorker, execute, dispatchStream } from './execute.ts'; 5 - import type { Task } from './task.ts'; 5 + import { Task } from './task.ts'; 6 6 import { StreamTask } from './stream-task.ts'; 7 7 import type { ChannelOptions } from './channel.ts'; 8 8 import type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; ··· 42 42 pool.length = 0; 43 43 } 44 44 45 - function dispatch<T>(task: Task<T>): Promise<T> { 46 - if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 45 + function resolveWorker(task: Task<any> | StreamTask<any>): Worker { 46 + if (task.worker != null) { 47 + const idx = workerHandles.indexOf(task.worker as WorkerHandle); 48 + if (idx !== -1) return pool[idx]; 49 + } 47 50 const worker = pool[next % pool.length]; 48 51 next++; 52 + return worker; 53 + } 54 + 55 + function dispatch<T>(task: Task<T>): Promise<T> { 56 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 57 + const worker = resolveWorker(task); 49 58 return track(execute<T>(worker, task.id, task.args)); 50 59 } 51 60 ··· 64 73 }; 65 74 } 66 75 67 - const workerHandles: readonly WorkerHandle[] = pool.map(makeWorkerHandle); 76 + const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); 68 77 69 78 const run: Runner = Object.assign( 70 79 (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 71 80 if (taskOrTasks instanceof StreamTask) { 72 81 if (disposed) throw new Error('Worker pool is disposed'); 73 - const worker = pool[next % pool.length]; 74 - next++; 82 + const worker = resolveWorker(taskOrTasks); 75 83 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 76 84 track(done); 77 85 return iterable;
+7
test/fixtures/worker-handle.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const identity = mo(import.meta, (n: number): number => n); 4 + 5 + export const countUp = mo(import.meta, async function* (n: number) { 6 + for (let i = 0; i < n; i++) yield i; 7 + });
+68
test/worker-handle.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers, assign, channel } from 'moroutine'; 4 + import { identity, countUp } from './fixtures/worker-handle.ts'; 5 + 6 + describe('WorkerHandle', () => { 7 + it('run.workers is a frozen array matching pool size', () => { 8 + const run = workers(3); 9 + try { 10 + assert.equal(run.workers.length, 3); 11 + assert.ok(Object.isFrozen(run.workers)); 12 + } finally { 13 + run[Symbol.dispose](); 14 + } 15 + }); 16 + 17 + it('w.exec() dispatches a task to the specific worker', async () => { 18 + const run = workers(2); 19 + try { 20 + const result = await run.workers[0].exec(identity(42)); 21 + assert.equal(result, 42); 22 + } finally { 23 + run[Symbol.dispose](); 24 + } 25 + }); 26 + 27 + it('w.exec() dispatches a streaming task', async () => { 28 + const run = workers(1); 29 + try { 30 + const results: number[] = []; 31 + for await (const n of run.workers[0].exec(countUp(3))) { 32 + results.push(n); 33 + } 34 + assert.deepEqual(results, [0, 1, 2]); 35 + } finally { 36 + run[Symbol.dispose](); 37 + } 38 + }); 39 + 40 + it('assign() pins task to a specific worker via run()', async () => { 41 + const run = workers(2); 42 + try { 43 + const result = await run(assign(run.workers[1], identity(99))); 44 + assert.equal(result, 99); 45 + } finally { 46 + run[Symbol.dispose](); 47 + } 48 + }); 49 + 50 + it('assign() works in a batch', async () => { 51 + const run = workers(2); 52 + try { 53 + const results = await run([ 54 + assign(run.workers[0], identity(1)), 55 + assign(run.workers[1], identity(2)), 56 + ]); 57 + assert.deepEqual(results, [1, 2]); 58 + } finally { 59 + run[Symbol.dispose](); 60 + } 61 + }); 62 + 63 + it('w.exec() rejects after dispose', async () => { 64 + const run = workers(1); 65 + run[Symbol.dispose](); 66 + await assert.rejects(() => run.workers[0].exec(identity(1)), { message: /disposed/ }); 67 + }); 68 + });