Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: load balancing implementation plan

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+637
+637
docs/superpowers/plans/2026-04-14-load-balancing.md
··· 1 + # Extensible Load Balancing Implementation Plan 2 + 3 + > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. 4 + 5 + **Goal:** Support pluggable load balancing strategies for worker pools, with built-in round-robin and least-busy implementations. 6 + 7 + **Architecture:** A `Balancer` interface with a `select()` method chooses which worker runs each task. `WorkerHandle` exposes `thread` and `activeCount` for balancers to read. Built-in `roundRobin()` and `leastBusy()` are exported as factories. The `workers()` function accepts overloaded signatures so opts can be passed without size. 8 + 9 + **Tech Stack:** TypeScript (erasable syntax only), worker_threads, node:test. 10 + 11 + --- 12 + 13 + ### Task 1: Update Types — Balancer, WorkerHandle, WorkerOptions, workers() overloads 14 + 15 + **Files:** 16 + - Modify: `src/runner.ts` 17 + 18 + - [ ] **Step 1: Update runner.ts** 19 + 20 + Replace the entire contents of `src/runner.ts` with: 21 + 22 + ```ts 23 + import type { Worker } from 'node:worker_threads'; 24 + import type { Task } from './task.ts'; 25 + import type { StreamTask } from './stream-task.ts'; 26 + import type { ChannelOptions } from './channel.ts'; 27 + 28 + type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 29 + 30 + /** A load balancing strategy for choosing which worker runs a task. */ 31 + export interface Balancer { 32 + /** Choose a worker for the given task. Called synchronously on every dispatch. */ 33 + select(workers: readonly WorkerHandle[], task: Task<any> | StreamTask<any>): WorkerHandle; 34 + /** Optional cleanup on sync dispose. */ 35 + [Symbol.dispose]?(): void; 36 + /** Optional cleanup on async dispose. */ 37 + [Symbol.asyncDispose]?(): Promise<void>; 38 + } 39 + 40 + /** Options for configuring a worker pool. */ 41 + export interface WorkerOptions { 42 + /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */ 43 + shutdownTimeout?: number; 44 + /** Load balancing strategy. Defaults to round-robin. */ 45 + balance?: Balancer; 46 + } 47 + 48 + /** A handle to a specific worker in a pool. */ 49 + export interface WorkerHandle { 50 + /** Dispatches a task pinned to this worker. */ 51 + exec<T>(task: Task<T>): Promise<T>; 52 + /** Dispatches a streaming task pinned to this worker. */ 53 + exec<T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 54 + /** The underlying worker thread. */ 55 + readonly thread: Worker; 56 + /** Number of currently in-flight tasks on this worker. */ 57 + readonly activeCount: number; 58 + } 59 + 60 + /** 61 + * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 62 + * 63 + * @param task - A single {@link Task} to run on a worker. 64 + * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 65 + */ 66 + export type Runner = { 67 + /** Dispatches a single task and returns its result. */ 68 + <T>(task: Task<T>): Promise<T>; 69 + /** Dispatches a batch of tasks in parallel and returns all results. */ 70 + <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 71 + /** Dispatches a streaming task and returns an async iterable of yielded values. */ 72 + <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 73 + /** AbortSignal that fires when the pool starts disposing. */ 74 + readonly signal: AbortSignal; 75 + /** Read-only array of worker handles, one per pool worker. */ 76 + readonly workers: readonly WorkerHandle[]; 77 + /** Terminates all workers immediately. */ 78 + [Symbol.dispose](): void; 79 + /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */ 80 + [Symbol.asyncDispose](): Promise<void>; 81 + }; 82 + ``` 83 + 84 + - [ ] **Step 2: Run type check** 85 + 86 + Run: `pnpm tsc --noEmit 2>&1` 87 + Expected: Errors in `worker-pool.ts` because `WorkerHandle` now requires `thread` and `activeCount`. That's expected — Task 3 fixes it. 88 + 89 + - [ ] **Step 3: Commit** 90 + 91 + ```bash 92 + git add src/runner.ts 93 + git commit -m "feat: add Balancer interface, thread and activeCount to WorkerHandle" 94 + ``` 95 + 96 + --- 97 + 98 + ### Task 2: Create Built-in Balancers 99 + 100 + **Files:** 101 + - Create: `src/balancers.ts` 102 + - Modify: `src/index.ts` 103 + - Create: `test/balancers.test.ts` 104 + 105 + - [ ] **Step 1: Write failing tests** 106 + 107 + Create `test/balancers.test.ts`: 108 + 109 + ```ts 110 + import { describe, it } from 'node:test'; 111 + import assert from 'node:assert/strict'; 112 + import { roundRobin, leastBusy } from 'moroutine'; 113 + import type { WorkerHandle } from 'moroutine'; 114 + 115 + function mockHandle(activeCount: number): WorkerHandle { 116 + return { activeCount, thread: {} as any, exec: {} as any }; 117 + } 118 + 119 + describe('roundRobin()', () => { 120 + it('cycles through workers in order', () => { 121 + const b = roundRobin(); 122 + const handles = [mockHandle(0), mockHandle(0), mockHandle(0)]; 123 + const task = { id: 'test', args: [], uid: 0 } as any; 124 + assert.equal(b.select(handles, task), handles[0]); 125 + assert.equal(b.select(handles, task), handles[1]); 126 + assert.equal(b.select(handles, task), handles[2]); 127 + assert.equal(b.select(handles, task), handles[0]); 128 + }); 129 + }); 130 + 131 + describe('leastBusy()', () => { 132 + it('picks worker with lowest activeCount', () => { 133 + const b = leastBusy(); 134 + const handles = [mockHandle(3), mockHandle(1), mockHandle(2)]; 135 + const task = { id: 'test', args: [], uid: 0 } as any; 136 + assert.equal(b.select(handles, task), handles[1]); 137 + }); 138 + 139 + it('breaks ties by index', () => { 140 + const b = leastBusy(); 141 + const handles = [mockHandle(1), mockHandle(1), mockHandle(1)]; 142 + const task = { id: 'test', args: [], uid: 0 } as any; 143 + assert.equal(b.select(handles, task), handles[0]); 144 + }); 145 + }); 146 + ``` 147 + 148 + - [ ] **Step 2: Run test to verify it fails** 149 + 150 + Run: `node --no-warnings --test test/balancers.test.ts 2>&1 | tail -10` 151 + Expected: FAIL — `roundRobin` and `leastBusy` not exported. 152 + 153 + - [ ] **Step 3: Implement balancers** 154 + 155 + Create `src/balancers.ts`: 156 + 157 + ```ts 158 + import type { Balancer, WorkerHandle } from './runner.ts'; 159 + import type { Task } from './task.ts'; 160 + import type { StreamTask } from './stream-task.ts'; 161 + 162 + /** 163 + * Creates a round-robin balancer that cycles through workers in order. 164 + * @returns A fresh Balancer instance. 165 + */ 166 + export function roundRobin(): Balancer { 167 + let next = 0; 168 + return { 169 + select(workers: readonly WorkerHandle[]): WorkerHandle { 170 + const worker = workers[next % workers.length]; 171 + next++; 172 + return worker; 173 + }, 174 + }; 175 + } 176 + 177 + /** 178 + * Creates a least-busy balancer that picks the worker with the lowest activeCount. 179 + * Ties are broken by index (first wins). 180 + * @returns A fresh Balancer instance. 181 + */ 182 + export function leastBusy(): Balancer { 183 + return { 184 + select(workers: readonly WorkerHandle[]): WorkerHandle { 185 + let best = workers[0]; 186 + for (let i = 1; i < workers.length; i++) { 187 + if (workers[i].activeCount < best.activeCount) { 188 + best = workers[i]; 189 + } 190 + } 191 + return best; 192 + }, 193 + }; 194 + } 195 + ``` 196 + 197 + - [ ] **Step 4: Export from index.ts** 198 + 199 + Add to `src/index.ts`, after the `assign` export: 200 + 201 + ```ts 202 + export { roundRobin, leastBusy } from './balancers.ts'; 203 + ``` 204 + 205 + Also add `Balancer` to the type export: 206 + 207 + Change: 208 + ```ts 209 + export type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 210 + ``` 211 + to: 212 + ```ts 213 + export type { Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 214 + ``` 215 + 216 + - [ ] **Step 5: Run tests** 217 + 218 + Run: `node --no-warnings --test test/balancers.test.ts 2>&1 | tail -10` 219 + Expected: All tests pass. 220 + 221 + - [ ] **Step 6: Commit** 222 + 223 + ```bash 224 + git add src/balancers.ts src/index.ts test/balancers.test.ts 225 + git commit -m "feat: roundRobin() and leastBusy() balancer factories" 226 + ``` 227 + 228 + --- 229 + 230 + ### Task 3: Integrate Balancer into worker-pool.ts 231 + 232 + Wire balancers into dispatch, add `thread` and `activeCount` to WorkerHandle, overload `workers()` signature, and dispose the balancer. 233 + 234 + **Files:** 235 + - Modify: `src/worker-pool.ts` 236 + - Create: `test/load-balancing.test.ts` 237 + - Create: `test/fixtures/load-balancing.ts` 238 + 239 + - [ ] **Step 1: Create test fixture** 240 + 241 + Create `test/fixtures/load-balancing.ts`: 242 + 243 + ```ts 244 + import { setTimeout } from 'node:timers/promises'; 245 + import { mo } from 'moroutine'; 246 + 247 + export const identity = mo(import.meta, (n: number): number => n); 248 + 249 + export const slow = mo(import.meta, async (ms: number): Promise<string> => { 250 + await setTimeout(ms); 251 + return 'done'; 252 + }); 253 + ``` 254 + 255 + - [ ] **Step 2: Write tests** 256 + 257 + Create `test/load-balancing.test.ts`: 258 + 259 + ```ts 260 + import { describe, it } from 'node:test'; 261 + import assert from 'node:assert/strict'; 262 + import { workers, leastBusy } from 'moroutine'; 263 + import type { Balancer, WorkerHandle } from 'moroutine'; 264 + import { identity, slow } from './fixtures/load-balancing.ts'; 265 + 266 + describe('load balancing', () => { 267 + it('defaults to round-robin', async () => { 268 + const run = workers(2); 269 + try { 270 + const result = await run(identity(42)); 271 + assert.equal(result, 42); 272 + } finally { 273 + run[Symbol.dispose](); 274 + } 275 + }); 276 + 277 + it('accepts a balancer option', async () => { 278 + const run = workers(2, { balance: leastBusy() }); 279 + try { 280 + const result = await run(identity(42)); 281 + assert.equal(result, 42); 282 + } finally { 283 + run[Symbol.dispose](); 284 + } 285 + }); 286 + 287 + it('accepts opts without size', async () => { 288 + const run = workers({ balance: leastBusy() }); 289 + try { 290 + assert.ok(run.workers.length > 0); 291 + const result = await run(identity(42)); 292 + assert.equal(result, 42); 293 + } finally { 294 + run[Symbol.dispose](); 295 + } 296 + }); 297 + 298 + it('exposes thread on WorkerHandle', () => { 299 + const run = workers(1); 300 + try { 301 + assert.ok(run.workers[0].thread); 302 + assert.equal(typeof run.workers[0].thread.threadId, 'number'); 303 + } finally { 304 + run[Symbol.dispose](); 305 + } 306 + }); 307 + 308 + it('tracks activeCount on WorkerHandle', async () => { 309 + const run = workers(1); 310 + try { 311 + assert.equal(run.workers[0].activeCount, 0); 312 + const promise = run(slow(100)); 313 + assert.equal(run.workers[0].activeCount, 1); 314 + await promise; 315 + assert.equal(run.workers[0].activeCount, 0); 316 + } finally { 317 + run[Symbol.dispose](); 318 + } 319 + }); 320 + 321 + it('custom balancer receives task', async () => { 322 + const seen: string[] = []; 323 + const custom: Balancer = { 324 + select(workers, task) { 325 + seen.push(task.id); 326 + return workers[0]; 327 + }, 328 + }; 329 + const run = workers(1, { balance: custom }); 330 + try { 331 + await run(identity(1)); 332 + assert.equal(seen.length, 1); 333 + assert.ok(seen[0].includes('#')); 334 + } finally { 335 + run[Symbol.dispose](); 336 + } 337 + }); 338 + 339 + it('balancer dispose is called on sync dispose', () => { 340 + let disposed = false; 341 + const custom: Balancer = { 342 + select(workers) { return workers[0]; }, 343 + [Symbol.dispose]() { disposed = true; }, 344 + }; 345 + const run = workers(1, { balance: custom }); 346 + run[Symbol.dispose](); 347 + assert.ok(disposed); 348 + }); 349 + 350 + it('balancer asyncDispose is called on async dispose', async () => { 351 + let disposed = false; 352 + const custom: Balancer = { 353 + select(workers) { return workers[0]; }, 354 + async [Symbol.asyncDispose]() { disposed = true; }, 355 + }; 356 + const run = workers(1, { balance: custom }); 357 + await run[Symbol.asyncDispose](); 358 + assert.ok(disposed); 359 + }); 360 + 361 + it('pinned tasks bypass balancer', async () => { 362 + let called = false; 363 + const custom: Balancer = { 364 + select(workers) { called = true; return workers[0]; }, 365 + }; 366 + const run = workers(2, { balance: custom }); 367 + try { 368 + const { assign } = await import('moroutine'); 369 + await run(assign(run.workers[1], identity(42))); 370 + assert.ok(!called); 371 + } finally { 372 + run[Symbol.dispose](); 373 + } 374 + }); 375 + }); 376 + ``` 377 + 378 + - [ ] **Step 3: Run tests to verify they fail** 379 + 380 + Run: `node --no-warnings --test test/load-balancing.test.ts 2>&1 | tail -10` 381 + Expected: FAIL — `workers()` doesn't accept opts as first arg, `thread`/`activeCount` not on handle. 382 + 383 + - [ ] **Step 4: Update worker-pool.ts** 384 + 385 + Replace `src/worker-pool.ts` with: 386 + 387 + ```ts 388 + import { setTimeout } from 'node:timers/promises'; 389 + import { Worker } from 'node:worker_threads'; 390 + import { availableParallelism } from 'node:os'; 391 + import { setupWorker, execute, dispatchStream } from './execute.ts'; 392 + import { Task } from './task.ts'; 393 + import { StreamTask } from './stream-task.ts'; 394 + import { roundRobin } from './balancers.ts'; 395 + import type { ChannelOptions } from './channel.ts'; 396 + import type { Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 397 + 398 + const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 399 + 400 + /** 401 + * Creates a pool of worker threads with configurable load balancing. 402 + * @param size - Number of worker threads, or options object. Defaults to `os.availableParallelism()`. 403 + * @param opts - Optional configuration including shutdown timeout and balancer. 404 + * @returns A disposable {@link Runner} for dispatching tasks. 405 + */ 406 + export function workers(sizeOrOpts?: number | WorkerOptions, opts?: WorkerOptions): Runner { 407 + let size: number; 408 + if (typeof sizeOrOpts === 'object') { 409 + opts = sizeOrOpts; 410 + size = availableParallelism(); 411 + } else { 412 + size = sizeOrOpts ?? availableParallelism(); 413 + } 414 + 415 + const balancer: Balancer = opts?.balance ?? roundRobin(); 416 + 417 + const pool: Worker[] = []; 418 + for (let i = 0; i < size; i++) { 419 + const worker = new Worker(workerEntryUrl); 420 + setupWorker(worker); 421 + worker.unref(); 422 + pool.push(worker); 423 + } 424 + 425 + let disposed = false; 426 + const ac = new AbortController(); 427 + const inflight = new Set<Promise<unknown>>(); 428 + const activeCounts = new Map<WorkerHandle, number>(); 429 + 430 + function track<T>(handle: WorkerHandle, promise: Promise<T>): Promise<T> { 431 + inflight.add(promise); 432 + activeCounts.set(handle, (activeCounts.get(handle) ?? 0) + 1); 433 + promise.finally(() => { 434 + inflight.delete(promise); 435 + activeCounts.set(handle, (activeCounts.get(handle) ?? 1) - 1); 436 + }).catch(() => {}); 437 + return promise; 438 + } 439 + 440 + function terminateAll(): void { 441 + for (const worker of pool) { 442 + worker.terminate(); 443 + } 444 + pool.length = 0; 445 + } 446 + 447 + function disposeBalancer(): void { 448 + if (Symbol.dispose in balancer) { 449 + balancer[Symbol.dispose]!(); 450 + } else if (Symbol.asyncDispose in balancer) { 451 + (balancer[Symbol.asyncDispose]! as () => Promise<void>)(); 452 + } 453 + } 454 + 455 + async function asyncDisposeBalancer(): Promise<void> { 456 + if (Symbol.asyncDispose in balancer) { 457 + await balancer[Symbol.asyncDispose]!(); 458 + } else if (Symbol.dispose in balancer) { 459 + balancer[Symbol.dispose]!(); 460 + } 461 + } 462 + 463 + function resolveWorkerAndHandle(task: Task<any> | StreamTask<any>): { worker: Worker; handle: WorkerHandle } { 464 + if (task.worker != null) { 465 + const idx = workerHandles.indexOf(task.worker as WorkerHandle); 466 + if (idx !== -1) return { worker: pool[idx], handle: workerHandles[idx] }; 467 + } 468 + const handle = balancer.select(workerHandles, task); 469 + const idx = workerHandles.indexOf(handle); 470 + return { worker: pool[idx], handle }; 471 + } 472 + 473 + function dispatch<T>(task: Task<T>): Promise<T> { 474 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 475 + const { worker, handle } = resolveWorkerAndHandle(task); 476 + return track(handle, execute<T>(worker, task.id, task.args)); 477 + } 478 + 479 + function makeWorkerHandle(worker: Worker, idx: number): WorkerHandle { 480 + let handle: WorkerHandle; 481 + handle = { 482 + exec<T>(task: Task<T> | StreamTask<T>, channelOpts?: ChannelOptions): any { 483 + if (task instanceof StreamTask) { 484 + if (disposed) throw new Error('Worker pool is disposed'); 485 + const { iterable, done } = dispatchStream(worker, task.id, task.args, channelOpts); 486 + track(handle, done); 487 + return iterable; 488 + } 489 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 490 + return track(handle, execute<T>(worker, task.id, task.args)); 491 + }, 492 + get thread() { 493 + return worker; 494 + }, 495 + get activeCount() { 496 + return activeCounts.get(handle) ?? 0; 497 + }, 498 + }; 499 + return handle; 500 + } 501 + 502 + const workerHandles: readonly WorkerHandle[] = Object.freeze(pool.map(makeWorkerHandle)); 503 + 504 + const run: Runner = Object.assign( 505 + (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 506 + if (taskOrTasks instanceof StreamTask) { 507 + if (disposed) throw new Error('Worker pool is disposed'); 508 + const { worker, handle } = resolveWorkerAndHandle(taskOrTasks); 509 + const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 510 + track(handle, done); 511 + return iterable; 512 + } 513 + if (Array.isArray(taskOrTasks)) { 514 + return Promise.all(taskOrTasks.map((t) => dispatch(t))); 515 + } 516 + return dispatch(taskOrTasks); 517 + }, 518 + { 519 + get signal() { 520 + return ac.signal; 521 + }, 522 + get workers() { 523 + return workerHandles; 524 + }, 525 + [Symbol.dispose]() { 526 + disposed = true; 527 + ac.abort(); 528 + disposeBalancer(); 529 + terminateAll(); 530 + }, 531 + async [Symbol.asyncDispose]() { 532 + disposed = true; 533 + ac.abort(); 534 + const settle = Promise.allSettled(inflight); 535 + if (opts?.shutdownTimeout != null) { 536 + const timeoutAc = new AbortController(); 537 + await Promise.race([ 538 + settle.finally(() => timeoutAc.abort()), 539 + setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}), 540 + ]); 541 + } else { 542 + await settle; 543 + } 544 + await asyncDisposeBalancer(); 545 + terminateAll(); 546 + }, 547 + }, 548 + ); 549 + 550 + return run; 551 + } 552 + ``` 553 + 554 + - [ ] **Step 5: Run tests** 555 + 556 + Run: `node --no-warnings --test test/load-balancing.test.ts 2>&1 | tail -15` 557 + Expected: All tests pass. 558 + 559 + Run: `pnpm test 2>&1 | tail -10` 560 + Expected: All existing tests pass. 561 + 562 + Run: `pnpm tsc --noEmit 2>&1` 563 + Expected: No errors. 564 + 565 + - [ ] **Step 6: Commit** 566 + 567 + ```bash 568 + git add src/worker-pool.ts test/load-balancing.test.ts test/fixtures/load-balancing.ts 569 + git commit -m "feat: integrate balancer into worker pool with activeCount and thread" 570 + ``` 571 + 572 + --- 573 + 574 + ### Task 4: Update README 575 + 576 + **Files:** 577 + - Modify: `README.md` 578 + 579 + - [ ] **Step 1: Update workers() docs** 580 + 581 + Read `README.md`. Find the `### \`workers(size)\`` section. Update the heading and description, and add a load balancing subsection. 582 + 583 + Change the heading from: 584 + ```markdown 585 + ### `workers(size)` 586 + ``` 587 + to: 588 + ```markdown 589 + ### `workers(size?, opts?)` 590 + ``` 591 + 592 + After the existing Graceful Shutdown subsection, add: 593 + 594 + ```markdown 595 + #### Load Balancing 596 + 597 + The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy: 598 + 599 + \`\`\`ts 600 + import { workers, leastBusy } from 'moroutine'; 601 + 602 + { 603 + using run = workers(4, { balance: leastBusy() }); 604 + // tasks dispatched to whichever worker has the fewest in-flight tasks 605 + } 606 + \`\`\` 607 + 608 + Built-in balancers: 609 + - `roundRobin()` — cycles through workers in order (default) 610 + - `leastBusy()` — picks the worker with the lowest active task count 611 + 612 + Custom balancers implement the `Balancer` interface: 613 + 614 + \`\`\`ts 615 + import type { Balancer } from 'moroutine'; 616 + 617 + const myBalancer: Balancer = { 618 + select(workers, task) { 619 + return workers[0]; // always use first worker 620 + }, 621 + }; 622 + \`\`\` 623 + 624 + Each worker handle exposes `thread` (the underlying `worker_threads.Worker`) and `activeCount` for building custom strategies. 625 + ``` 626 + 627 + - [ ] **Step 2: Run type check** 628 + 629 + Run: `pnpm tsc --noEmit 2>&1` 630 + Expected: No errors. 631 + 632 + - [ ] **Step 3: Commit** 633 + 634 + ```bash 635 + git add README.md 636 + git commit -m "docs: document load balancing in README" 637 + ```