Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: integrate balancer into worker pool with activeCount and thread

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+188 -23
+64 -23
src/worker-pool.ts
··· 4 4 import { setupWorker, execute, dispatchStream } from './execute.ts'; 5 5 import { Task } from './task.ts'; 6 6 import { StreamTask } from './stream-task.ts'; 7 + import { roundRobin } from './balancers.ts'; 7 8 import type { ChannelOptions } from './channel.ts'; 8 - import type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 9 + import type { Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 9 10 10 11 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 11 12 12 13 /** 13 - * Creates a pool of worker threads that dispatch tasks with round-robin scheduling. 14 - * @param size - Number of worker threads in the pool. Defaults to the number of available CPUs. 15 - * @param opts - Optional configuration including shutdown timeout. 14 + * Creates a pool of worker threads with configurable load balancing. 15 + * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`. 16 + * @param opts - Optional configuration including shutdown timeout and balancer. 16 17 * @returns A disposable {@link Runner} for dispatching tasks. 17 18 */ 18 - export function workers(size: number = availableParallelism(), opts?: WorkerOptions): Runner { 19 + export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner { 20 + let size: number; 21 + if (typeof sizeOrOpts === 'object') { 22 + opts = sizeOrOpts; 23 + size = availableParallelism(); 24 + } else { 25 + size = sizeOrOpts ?? availableParallelism(); 26 + } 27 + 28 + const balancer: Balancer = opts?.balance ?? roundRobin(); 29 + 19 30 const pool: Worker[] = []; 20 31 for (let i = 0; i < size; i++) { 21 32 const worker = new Worker(workerEntryUrl); ··· 24 35 pool.push(worker); 25 36 } 26 37 27 - let next = 0; 28 38 let disposed = false; 29 39 const ac = new AbortController(); 30 40 const inflight = new Set<Promise<unknown>>(); 41 + const activeCounts = new Map<WorkerHandle, number>(); 31 42 32 - function track<T>(promise: Promise<T>): Promise<T> { 43 + function track<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> { 33 44 inflight.add(promise); 34 - promise.finally(() => inflight.delete(promise)).catch(() => {}); 45 + activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 46 + promise.finally(() => { 47 + inflight.delete(promise); 48 + activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 49 + }).catch(() => {}); 35 50 return promise; 36 51 } 37 52 ··· 42 57 pool.length = 0; 43 58 } 44 59 45 - function resolveWorker(task: Task<any> | StreamTask<any>): Worker { 60 + function disposeBalancer(): void { 61 + if (Symbol.dispose in balancer) { 62 + balancer[Symbol.dispose]!(); 63 + } else if (Symbol.asyncDispose in balancer) { 64 + (balancer[Symbol.asyncDispose]! as () => Promise<void>)(); 65 + } 66 + } 67 + 68 + async function asyncDisposeBalancer(): Promise<void> { 69 + if (Symbol.asyncDispose in balancer) { 70 + await balancer[Symbol.asyncDispose]!(); 71 + } else if (Symbol.dispose in balancer) { 72 + balancer[Symbol.dispose]!(); 73 + } 74 + } 75 + 76 + function resolveWorkerAndHandle(task: Task<any> | StreamTask<any>): { worker: Worker; handle: WorkerHandle } { 46 77 if (task.worker != null) { 47 78 const idx = workerHandles.indexOf(task.worker as WorkerHandle); 48 - if (idx !== -1) return pool[idx]; 79 + if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] }; 49 80 } 50 - const worker = pool[next % pool.length]; 51 - next++; 52 - return worker; 81 + const handle = balancer.select(workerHandles, task); 82 + const idx = workerHandles.indexOf(handle); 83 + return { worker: pool[idx], handle }; 53 84 } 54 85 55 86 function dispatch<T>(task: Task<T>): Promise<T> { 56 87 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 57 - const worker = resolveWorker(task); 58 - return track(execute<T>(worker, task.id, task.args)); 88 + const { worker, handle } = resolveWorkerAndHandle(task); 89 + return track(handle, execute<T>(worker, task.id, task.args)); 59 90 } 60 91 61 - function makeWorkerHandle(worker: Worker): WorkerHandle { 62 - return { 63 - exec<T>(task: Task<T> | StreamTask<T>, opts?: ChannelOptions): any { 92 + function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle { 93 + let handle: WorkerHandle; 94 + handle = { 95 + exec<T>(task: Task<T> | StreamTask<T>, channelOpts?: ChannelOptions): any { 64 96 if (task instanceof StreamTask) { 65 97 if (disposed) throw new Error('Worker pool is disposed'); 66 - const { iterable, done } = dispatchStream(worker, task.id, task.args, opts); 67 - track(done); 98 + const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts); 99 + track(handle, done); 68 100 return iterable; 69 101 } 70 102 if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 71 - return track(execute<T>(worker, task.id, task.args)); 103 + return track(handle, execute<T>(worker, task.id, task.args)); 104 + }, 105 + get thread() { 106 + return worker; 107 + }, 108 + get activeCount() { 109 + return activeCounts.get(handle) ?? 0; 72 110 }, 73 111 }; 112 + return handle; 74 113 } 75 114 76 115 const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); ··· 79 118 (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 80 119 if (taskOrTasks instanceof StreamTask) { 81 120 if (disposed) throw new Error('Worker pool is disposed'); 82 - const worker = resolveWorker(taskOrTasks); 121 + const { worker, handle } = resolveWorkerAndHandle(taskOrTasks); 83 122 const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 84 - track(done); 123 + track(handle, done); 85 124 return iterable; 86 125 } 87 126 if (Array.isArray(taskOrTasks)) { ··· 99 138 [Symbol.dispose]() { 100 139 disposed = true; 101 140 ac.abort(); 141 + disposeBalancer(); 102 142 terminateAll(); 103 143 }, 104 144 async [Symbol.asyncDispose]() { ··· 114 154 } else { 115 155 await settle; 116 156 } 157 + await asyncDisposeBalancer(); 117 158 terminateAll(); 118 159 }, 119 160 },
+9
test/fixtures/load-balancing.ts
··· 1 + import { setTimeout } from 'node:timers/promises'; 2 + import { mo } from 'moroutine'; 3 + 4 + export const identity = mo(import.meta, (n: number): number => n); 5 + 6 + export const slow = mo(import.meta, async (ms: number): Promise<string> => { 7 + await setTimeout(ms); 8 + return 'done'; 9 + });
+115
test/load-balancing.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers, assign, leastBusy } from 'moroutine'; 4 + import type { Balancer } from 'moroutine'; 5 + import { identity, slow } from './fixtures/load-balancing.ts'; 6 + 7 + describe('load balancing', () => { 8 + it('defaults to round-robin', async () => { 9 + const run = workers(2); 10 + try { 11 + const result = await run(identity(42)); 12 + assert.equal(result, 42); 13 + } finally { 14 + run[Symbol.dispose](); 15 + } 16 + }); 17 + 18 + it('accepts a balancer option', async () => { 19 + const run = workers(2, { balance: leastBusy() }); 20 + try { 21 + const result = await run(identity(42)); 22 + assert.equal(result, 42); 23 + } finally { 24 + run[Symbol.dispose](); 25 + } 26 + }); 27 + 28 + it('accepts opts without size', async () => { 29 + const run = workers({ balance: leastBusy() }); 30 + try { 31 + assert.ok(run.workers.length > 0); 32 + const result = await run(identity(42)); 33 + assert.equal(result, 42); 34 + } finally { 35 + run[Symbol.dispose](); 36 + } 37 + }); 38 + 39 + it('exposes thread on WorkerHandle', () => { 40 + const run = workers(1); 41 + try { 42 + assert.ok(run.workers[0].thread); 43 + assert.equal(typeof run.workers[0].thread.threadId, 'number'); 44 + } finally { 45 + run[Symbol.dispose](); 46 + } 47 + }); 48 + 49 + it('tracks activeCount on WorkerHandle', async () => { 50 + const run = workers(1); 51 + try { 52 + assert.equal(run.workers[0].activeCount, 0); 53 + const promise = run(slow(100)); 54 + assert.equal(run.workers[0].activeCount, 1); 55 + await promise; 56 + assert.equal(run.workers[0].activeCount, 0); 57 + } finally { 58 + run[Symbol.dispose](); 59 + } 60 + }); 61 + 62 + it('custom balancer receives task', async () => { 63 + const seen: string[] = []; 64 + const custom: Balancer = { 65 + select(workers, task) { 66 + seen.push(task.id); 67 + return workers[0]; 68 + }, 69 + }; 70 + const run = workers(1, { balance: custom }); 71 + try { 72 + await run(identity(1)); 73 + assert.equal(seen.length, 1); 74 + assert.ok(seen[0].includes('#')); 75 + } finally { 76 + run[Symbol.dispose](); 77 + } 78 + }); 79 + 80 + it('balancer dispose is called on sync dispose', () => { 81 + let disposed = false; 82 + const custom: Balancer = { 83 + select(workers) { return workers[0]; }, 84 + [Symbol.dispose]() { disposed = true; }, 85 + }; 86 + const run = workers(1, { balance: custom }); 87 + run[Symbol.dispose](); 88 + assert.ok(disposed); 89 + }); 90 + 91 + it('balancer asyncDispose is called on async dispose', async () => { 92 + let disposed = false; 93 + const custom: Balancer = { 94 + select(workers) { return workers[0]; }, 95 + async [Symbol.asyncDispose]() { disposed = true; }, 96 + }; 97 + const run = workers(1, { balance: custom }); 98 + await run[Symbol.asyncDispose](); 99 + assert.ok(disposed); 100 + }); 101 + 102 + it('pinned tasks bypass balancer', async () => { 103 + let called = false; 104 + const custom: Balancer = { 105 + select(workers) { called = true; return workers[0]; }, 106 + }; 107 + const run = workers(2, { balance: custom }); 108 + try { 109 + await run(assign(run.workers[1], identity(42))); 110 + assert.ok(!called); 111 + } finally { 112 + run[Symbol.dispose](); 113 + } 114 + }); 115 + });