Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Runner accepts array of tasks, update examples to use run([...])

run() now accepts either a single Task or an array of Tasks.
With an array, it returns Promise<[R1, R2, ...]> with proper
tuple typing. Replaces Promise.all([run(a), run(b)]) pattern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+26 -16
+1 -1
examples/multi-module/main.ts
··· 12 12 using run = workers(1); 13 13 14 14 // All four moroutines from two modules run on the same single worker 15 - const results = await Promise.all([run(square(7)), run(add(3, 4)), run(uppercase('hello')), run(repeat('ab', 3))]); 15 + const results = await run([square(7), add(3, 4), uppercase('hello'), repeat('ab', 3)]); 16 16 17 17 console.log('square(7) =', results[0]); // 49 18 18 console.log('add(3, 4) =', results[1]); // 7
+1 -1
examples/non-blocking/main.ts
··· 20 20 { 21 21 using run = workers(2); 22 22 const start = performance.now(); 23 - const [a, b] = await Promise.all([run(fibonacci(42)), run(fibonacci(41))]); 23 + const [a, b] = await run([fibonacci(42), fibonacci(41)]); 24 24 const elapsed = (performance.now() - start).toFixed(0); 25 25 26 26 clearInterval(interval);
+5 -5
examples/shared-state/main.ts
··· 16 16 using run = workers(4); 17 17 18 18 // 4 workers each move the position (1, 2) per step, 1000 steps each 19 - await Promise.all([ 20 - run(updatePosition(mu, pos, 1, 2, steps)), 21 - run(updatePosition(mu, pos, 1, 2, steps)), 22 - run(updatePosition(mu, pos, 1, 2, steps)), 23 - run(updatePosition(mu, pos, 1, 2, steps)), 19 + await run([ 20 + updatePosition(mu, pos, 1, 2, steps), 21 + updatePosition(mu, pos, 1, 2, steps), 22 + updatePosition(mu, pos, 1, 2, steps), 23 + updatePosition(mu, pos, 1, 2, steps), 24 24 ]); 25 25 } 26 26
+3
src/runner.ts
··· 1 1 import type { Task } from './task.ts'; 2 2 3 + type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 4 + 3 5 export type Runner = { 4 6 <T>(task: Task<T>): Promise<T>; 7 + <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 5 8 [Symbol.dispose](): void; 6 9 };
+12 -5
src/worker-pool.ts
··· 17 17 let next = 0; 18 18 let disposed = false; 19 19 20 + function dispatch<T>(task: Task<T>): Promise<T> { 21 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 22 + const worker = pool[next % pool.length]; 23 + next++; 24 + return execute<T>(worker, task.id, task.args); 25 + } 26 + 20 27 const run: Runner = Object.assign( 21 - <T>(task: Task<T>): Promise<T> => { 22 - if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 23 - const worker = pool[next % pool.length]; 24 - next++; 25 - return execute<T>(worker, task.id, task.args); 28 + (taskOrTasks: Task<any> | Task<any>[]): any => { 29 + if (Array.isArray(taskOrTasks)) { 30 + return Promise.all(taskOrTasks.map((t) => dispatch(t))); 31 + } 32 + return dispatch(taskOrTasks); 26 33 }, 27 34 { 28 35 [Symbol.dispose]() {
+1 -1
test/pool.test.ts
··· 17 17 it('handles concurrent calls across pool workers', async () => { 18 18 const run = workers(2); 19 19 try { 20 - const results = await Promise.all([run(double(1)), run(double(2)), run(double(3)), run(double(4))]); 20 + const results = await run([double(1), double(2), double(3), double(4)]); 21 21 assert.deepEqual(results, [2, 4, 6, 8]); 22 22 } finally { 23 23 run[Symbol.dispose]();
+3 -3
test/shared/cross-worker.test.ts
··· 9 9 const counter = int32atomic(); 10 10 const run = workers(2); 11 11 try { 12 - await Promise.all([run(atomicAdd(counter, 10)), run(atomicAdd(counter, 20))]); 12 + await run([atomicAdd(counter, 10), atomicAdd(counter, 20)]); 13 13 assert.equal(counter.load(), 30); 14 14 } finally { 15 15 run[Symbol.dispose](); ··· 21 21 const counter = int32atomic(); 22 22 const run = workers(2); 23 23 try { 24 - await Promise.all([run(mutexIncrement(m, counter, 100)), run(mutexIncrement(m, counter, 100))]); 24 + await run([mutexIncrement(m, counter, 100), mutexIncrement(m, counter, 100)]); 25 25 assert.equal(counter.load(), 200); 26 26 } finally { 27 27 run[Symbol.dispose](); ··· 34 34 counter.store(42); 35 35 const run = workers(2); 36 36 try { 37 - const [a, b] = await Promise.all([run(rwlockRead(rw, counter)), run(rwlockRead(rw, counter))]); 37 + const [a, b] = await run([rwlockRead(rw, counter), rwlockRead(rw, counter)]); 38 38 assert.equal(a, 42); 39 39 assert.equal(b, 42); 40 40 } finally {