Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: per-worker dispatch implementation plan

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+604
+604
docs/superpowers/plans/2026-04-13-per-worker-dispatch.md
··· 1 + # Per-Worker Dispatch Implementation Plan 2 + 3 + > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. 4 + 5 + **Goal:** Allow pinning tasks to specific workers via `assign()` and `run.workers`, removing the need to know worker count or assume round-robin scheduling. 6 + 7 + **Architecture:** `Task` and `StreamTask` gain an optional `worker` property. `assign(handle, task)` creates a copy with `worker` set. `workers()` builds a frozen `WorkerHandle[]` exposed on the runner. Dispatch checks `task.worker` — if set, dispatches to that worker; if not, round-robin as before. 8 + 9 + **Tech Stack:** TypeScript (erasable syntax only), worker_threads, node:test. 10 + 11 + --- 12 + 13 + ### Task 1: Add `worker` Property to Task and StreamTask 14 + 15 + **Files:** 16 + - Modify: `src/task.ts` 17 + - Modify: `src/stream-task.ts` 18 + 19 + - [ ] **Step 1: Add optional worker property to Task** 20 + 21 + Read `src/task.ts`. Add a `worker` property. The type is `unknown` here to avoid circular imports (the `WorkerHandle` type lives in `src/runner.ts` which imports `Task`). The property is optional and unset by default. 22 + 23 + Add after the existing `readonly args: unknown[];` line: 24 + 25 + ```ts 26 + worker?: unknown; 27 + ``` 28 + 29 + - [ ] **Step 2: Add optional worker property to StreamTask** 30 + 31 + Read `src/stream-task.ts`. Add the same property after `readonly args: unknown[];`: 32 + 33 + ```ts 34 + worker?: unknown; 35 + ``` 36 + 37 + - [ ] **Step 3: Run type check** 38 + 39 + Run: `pnpm tsc --noEmit 2>&1` 40 + Expected: No errors. 41 + 42 + - [ ] **Step 4: Commit** 43 + 44 + ```bash 45 + git add src/task.ts src/stream-task.ts 46 + git commit -m "feat: add optional worker property to Task and StreamTask" 47 + ``` 48 + 49 + --- 50 + 51 + ### Task 2: Define WorkerHandle and Update Runner Type 52 + 53 + **Files:** 54 + - Modify: `src/runner.ts` 55 + - Modify: `src/index.ts` 56 + 57 + - [ ] **Step 1: Add WorkerHandle and update Runner** 58 + 59 + Replace `src/runner.ts` with: 60 + 61 + ```ts 62 + import type { Task } from './task.ts'; 63 + import type { StreamTask } from './stream-task.ts'; 64 + import type { ChannelOptions } from './channel.ts'; 65 + 66 + type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 67 + 68 + /** Options for configuring a worker pool. */ 69 + export interface WorkerOptions { 70 + /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */ 71 + shutdownTimeout?: number; 72 + } 73 + 74 + /** A handle to a specific worker in a pool. */ 75 + export interface WorkerHandle { 76 + /** Dispatches a task pinned to this worker. */ 77 + exec<T>(task: Task<T>): Promise<T>; 78 + /** Dispatches a streaming task pinned to this worker. */ 79 + exec<T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 80 + } 81 + 82 + /** 83 + * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 84 + * 85 + * @param task - A single {@link Task} to run on a worker. 86 + * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 87 + */ 88 + export type Runner = { 89 + /** Dispatches a single task and returns its result. */ 90 + <T>(task: Task<T>): Promise<T>; 91 + /** Dispatches a batch of tasks in parallel and returns all results. */ 92 + <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 93 + /** Dispatches a streaming task and returns an async iterable of yielded values. */ 94 + <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 95 + /** AbortSignal that fires when the pool starts disposing. */ 96 + readonly signal: AbortSignal; 97 + /** Read-only array of worker handles, one per pool worker. */ 98 + readonly workers: readonly WorkerHandle[]; 99 + /** Terminates all workers immediately. */ 100 + [Symbol.dispose](): void; 101 + /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */ 102 + [Symbol.asyncDispose](): Promise<void>; 103 + }; 104 + ``` 105 + 106 + - [ ] **Step 2: Export WorkerHandle from index.ts** 107 + 108 + In `src/index.ts`, change: 109 + ```ts 110 + export type { Runner, WorkerOptions } from './runner.ts'; 111 + ``` 112 + to: 113 + ```ts 114 + export type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 115 + ``` 116 + 117 + - [ ] **Step 3: Run type check** 118 + 119 + Run: `pnpm tsc --noEmit 2>&1` 120 + Expected: No errors. 121 + 122 + - [ ] **Step 4: Commit** 123 + 124 + ```bash 125 + git add src/runner.ts src/index.ts 126 + git commit -m "feat: add WorkerHandle type and workers property to Runner" 127 + ``` 128 + 129 + --- 130 + 131 + ### Task 3: Create `assign()` Function 132 + 133 + **Files:** 134 + - Create: `src/assign.ts` 135 + - Modify: `src/index.ts` 136 + - Create: `test/assign.test.ts` 137 + 138 + - [ ] **Step 1: Write failing tests** 139 + 140 + Create `test/assign.test.ts`: 141 + 142 + ```ts 143 + import { describe, it } from 'node:test'; 144 + import assert from 'node:assert/strict'; 145 + import { mo, workers, assign } from 'moroutine'; 146 + 147 + const double = mo(import.meta, (n: number): number => n * 2); 148 + 149 + describe('assign()', () => { 150 + it('returns a new task with worker set', () => { 151 + const run = workers(1); 152 + try { 153 + const task = double(5); 154 + const assigned = assign(run.workers[0], task); 155 + assert.notStrictEqual(assigned, task); 156 + assert.equal(assigned.id, task.id); 157 + assert.deepEqual(assigned.args, task.args); 158 + assert.notEqual(assigned.uid, task.uid); 159 + assert.equal(assigned.worker, run.workers[0]); 160 + } finally { 161 + run[Symbol.dispose](); 162 + } 163 + }); 164 + 165 + it('does not modify the original task', () => { 166 + const run = workers(1); 167 + try { 168 + const task = double(5); 169 + assign(run.workers[0], task); 170 + assert.equal(task.worker, undefined); 171 + } finally { 172 + run[Symbol.dispose](); 173 + } 174 + }); 175 + }); 176 + ``` 177 + 178 + - [ ] **Step 2: Run test to verify it fails** 179 + 180 + Run: `node --no-warnings --test test/assign.test.ts 2>&1 | tail -10` 181 + Expected: FAIL — `assign` not exported. 182 + 183 + - [ ] **Step 3: Implement assign()** 184 + 185 + Create `src/assign.ts`: 186 + 187 + ```ts 188 + import { Task } from './task.ts'; 189 + import { StreamTask } from './stream-task.ts'; 190 + import type { WorkerHandle } from './runner.ts'; 191 + 192 + /** 193 + * Returns a copy of the task pinned to a specific worker. 194 + * The original task is unchanged. 195 + * @param worker - The worker handle to pin the task to. 196 + * @param task - The task or streaming task to assign. 197 + * @returns A new task with the same id and args, pinned to the given worker. 198 + */ 199 + export function assign<T>(worker: WorkerHandle, task: Task<T>): Task<T>; 200 + export function assign<T>(worker: WorkerHandle, task: StreamTask<T>): StreamTask<T>; 201 + export function assign(worker: WorkerHandle, task: Task<any> | StreamTask<any>): Task<any> | StreamTask<any> { 202 + if (task instanceof StreamTask) { 203 + const copy = new StreamTask(task.id, task.args); 204 + copy.worker = worker; 205 + return copy; 206 + } 207 + const copy = new Task(task.id, task.args); 208 + copy.worker = worker; 209 + return copy; 210 + } 211 + ``` 212 + 213 + - [ ] **Step 4: Export assign from index.ts** 214 + 215 + Add to `src/index.ts`: 216 + ```ts 217 + export { assign } from './assign.ts'; 218 + ``` 219 + 220 + - [ ] **Step 5: Run tests** 221 + 222 + Run: `node --no-warnings --test test/assign.test.ts 2>&1 | tail -10` 223 + Expected: All tests pass. 224 + 225 + Run: `pnpm tsc --noEmit 2>&1` 226 + Expected: No errors. 227 + 228 + - [ ] **Step 6: Commit** 229 + 230 + ```bash 231 + git add src/assign.ts src/index.ts test/assign.test.ts 232 + git commit -m "feat: assign() for pinning tasks to specific workers" 233 + ``` 234 + 235 + --- 236 + 237 + ### Task 4: Implement WorkerHandle and `run.workers` in worker-pool.ts 238 + 239 + **Files:** 240 + - Modify: `src/worker-pool.ts` 241 + - Create: `test/worker-handle.test.ts` 242 + - Create: `test/fixtures/worker-handle.ts` 243 + 244 + - [ ] **Step 1: Write test fixtures** 245 + 246 + Create `test/fixtures/worker-handle.ts`: 247 + 248 + ```ts 249 + import { mo } from 'moroutine'; 250 + 251 + export const identity = mo(import.meta, (n: number): number => n); 252 + 253 + export const countUp = mo(import.meta, async function* (n: number) { 254 + for (let i = 0; i < n; i++) yield i; 255 + }); 256 + ``` 257 + 258 + - [ ] **Step 2: Write failing tests** 259 + 260 + Create `test/worker-handle.test.ts`: 261 + 262 + ```ts 263 + import { describe, it } from 'node:test'; 264 + import assert from 'node:assert/strict'; 265 + import { workers, assign, channel } from 'moroutine'; 266 + import { identity, countUp } from './fixtures/worker-handle.ts'; 267 + 268 + describe('WorkerHandle', () => { 269 + it('run.workers is a frozen array matching pool size', () => { 270 + const run = workers(3); 271 + try { 272 + assert.equal(run.workers.length, 3); 273 + assert.ok(Object.isFrozen(run.workers)); 274 + } finally { 275 + run[Symbol.dispose](); 276 + } 277 + }); 278 + 279 + it('w.exec() dispatches a task to the specific worker', async () => { 280 + const run = workers(2); 281 + try { 282 + const result = await run.workers[0].exec(identity(42)); 283 + assert.equal(result, 42); 284 + } finally { 285 + run[Symbol.dispose](); 286 + } 287 + }); 288 + 289 + it('w.exec() dispatches a streaming task', async () => { 290 + const run = workers(1); 291 + try { 292 + const results: number[] = []; 293 + for await (const n of run.workers[0].exec(countUp(3))) { 294 + results.push(n); 295 + } 296 + assert.deepEqual(results, [0, 1, 2]); 297 + } finally { 298 + run[Symbol.dispose](); 299 + } 300 + }); 301 + 302 + it('assign() pins task to a specific worker via run()', async () => { 303 + const run = workers(2); 304 + try { 305 + const task = assign(run.workers[1], identity(99)); 306 + const result = await run(task); 307 + assert.equal(result, 99); 308 + } finally { 309 + run[Symbol.dispose](); 310 + } 311 + }); 312 + 313 + it('assign() works in a batch', async () => { 314 + const run = workers(2); 315 + try { 316 + const results = await run([ 317 + assign(run.workers[0], identity(1)), 318 + assign(run.workers[1], identity(2)), 319 + ]); 320 + assert.deepEqual(results, [1, 2]); 321 + } finally { 322 + run[Symbol.dispose](); 323 + } 324 + }); 325 + 326 + it('channel fan-out via assign + run.workers.map', async () => { 327 + const run = workers(2); 328 + try { 329 + const ch = channel(countUp(20)); 330 + const results: number[][] = await run( 331 + run.workers.map((w) => { 332 + return assign(w, identity(0)); 333 + }), 334 + ); 335 + // Just verify it dispatches and returns — identity doesn't consume a channel 336 + assert.equal(results.length, 2); 337 + } finally { 338 + run[Symbol.dispose](); 339 + } 340 + }); 341 + 342 + it('w.exec() rejects after dispose', async () => { 343 + const run = workers(1); 344 + run[Symbol.dispose](); 345 + await assert.rejects(() => run.workers[0].exec(identity(1)), { message: /disposed/ }); 346 + }); 347 + }); 348 + ``` 349 + 350 + - [ ] **Step 3: Run test to verify it fails** 351 + 352 + Run: `node --no-warnings --test test/worker-handle.test.ts 2>&1 | tail -10` 353 + Expected: FAIL — `run.workers` not defined yet. 354 + 355 + - [ ] **Step 4: Implement WorkerHandle and run.workers** 356 + 357 + Replace `src/worker-pool.ts` with: 358 + 359 + ```ts 360 + import { setTimeout } from 'node:timers/promises'; 361 + import { Worker } from 'node:worker_threads'; 362 + import { availableParallelism } from 'node:os'; 363 + import { setupWorker, execute, dispatchStream } from './execute.ts'; 364 + import { Task } from './task.ts'; 365 + import { StreamTask } from './stream-task.ts'; 366 + import type { ChannelOptions } from './channel.ts'; 367 + import type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 368 + 369 + const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 370 + 371 + /** 372 + * Creates a pool of worker threads that dispatch tasks with round-robin scheduling. 373 + * @param size - Number of worker threads in the pool. Defaults to the number of available CPUs. 374 + * @param opts - Optional configuration including shutdown timeout. 375 + * @returns A disposable {@link Runner} for dispatching tasks. 376 + */ 377 + export function workers(size: number = availableParallelism(), opts?: WorkerOptions): Runner { 378 + const pool: Worker[] = []; 379 + for (let i = 0; i < size; i++) { 380 + const worker = new Worker(workerEntryUrl); 381 + setupWorker(worker); 382 + worker.unref(); 383 + pool.push(worker); 384 + } 385 + 386 + let next = 0; 387 + let disposed = false; 388 + const ac = new AbortController(); 389 + const inflight = new Set<Promise<unknown>>(); 390 + 391 + function track<T>(promise: Promise<T>): Promise<T> { 392 + inflight.add(promise); 393 + promise.finally(() => inflight.delete(promise)); 394 + return promise; 395 + } 396 + 397 + function terminateAll(): void { 398 + for (const worker of pool) { 399 + worker.terminate(); 400 + } 401 + pool.length = 0; 402 + } 403 + 404 + function resolveWorker(task: Task<any> | StreamTask<any>): Worker { 405 + if (task.worker != null) { 406 + const idx = handles.indexOf(task.worker as WorkerHandle); 407 + if (idx !== -1) return pool[idx]; 408 + } 409 + const worker = pool[next % pool.length]; 410 + next++; 411 + return worker; 412 + } 413 + 414 + function dispatch<T>(task: Task<T>): Promise<T> { 415 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 416 + const worker = resolveWorker(task); 417 + return track(execute<T>(worker, task.id, task.args)); 418 + } 419 + 420 + function dispatchStreamTask<T>(task: StreamTask<T>, channelOpts?: ChannelOptions): AsyncIterable<T> { 421 + if (disposed) throw new Error('Worker pool is disposed'); 422 + const worker = resolveWorker(task); 423 + const { iterable, done } = dispatchStream<T>(worker, task.id, task.args, channelOpts); 424 + track(done); 425 + return iterable; 426 + } 427 + 428 + const handles: WorkerHandle[] = pool.map((_, idx) => ({ 429 + exec(task: Task<any> | StreamTask<any>, channelOpts?: ChannelOptions): any { 430 + if (task instanceof StreamTask) { 431 + if (disposed) throw new Error('Worker pool is disposed'); 432 + const { iterable, done } = dispatchStream(pool[idx], task.id, task.args, channelOpts); 433 + track(done); 434 + return iterable; 435 + } 436 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 437 + return track(execute(pool[idx], task.id, task.args)); 438 + }, 439 + })); 440 + Object.freeze(handles); 441 + 442 + const run: Runner = Object.assign( 443 + (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 444 + if (taskOrTasks instanceof StreamTask) { 445 + return dispatchStreamTask(taskOrTasks, channelOpts); 446 + } 447 + if (Array.isArray(taskOrTasks)) { 448 + return Promise.all(taskOrTasks.map((t) => dispatch(t))); 449 + } 450 + return dispatch(taskOrTasks); 451 + }, 452 + { 453 + get signal() { 454 + return ac.signal; 455 + }, 456 + get workers(): readonly WorkerHandle[] { 457 + return handles; 458 + }, 459 + [Symbol.dispose]() { 460 + disposed = true; 461 + ac.abort(); 462 + terminateAll(); 463 + }, 464 + async [Symbol.asyncDispose]() { 465 + disposed = true; 466 + ac.abort(); 467 + const settle = Promise.allSettled(inflight); 468 + if (opts?.shutdownTimeout != null) { 469 + const timeoutAc = new AbortController(); 470 + await Promise.race([ 471 + settle.finally(() => timeoutAc.abort()), 472 + setTimeout(opts.shutdownTimeout, undefined, { signal: timeoutAc.signal }).catch(() => {}), 473 + ]); 474 + } else { 475 + await settle; 476 + } 477 + terminateAll(); 478 + }, 479 + }, 480 + ); 481 + 482 + return run; 483 + } 484 + ``` 485 + 486 + - [ ] **Step 5: Run tests** 487 + 488 + Run: `node --no-warnings --test test/worker-handle.test.ts 2>&1 | tail -15` 489 + Expected: All tests pass. 490 + 491 + Run: `pnpm test 2>&1 | tail -10` 492 + Expected: All existing tests pass. 493 + 494 + Run: `pnpm tsc --noEmit 2>&1` 495 + Expected: No errors. 496 + 497 + - [ ] **Step 6: Commit** 498 + 499 + ```bash 500 + git add src/worker-pool.ts test/worker-handle.test.ts test/fixtures/worker-handle.ts 501 + git commit -m "feat: WorkerHandle, run.workers, and assign() dispatch support" 502 + ``` 503 + 504 + --- 505 + 506 + ### Task 5: Update Channel-Fanout Example 507 + 508 + **Files:** 509 + - Modify: `examples/channel-fanout/main.ts` 510 + 511 + - [ ] **Step 1: Update example to use assign + run.workers** 512 + 513 + Replace `examples/channel-fanout/main.ts` with: 514 + 515 + ```ts 516 + // Fan-out a single channel to multiple workers using work stealing. 517 + // Each item goes to whichever worker is ready first. 518 + // Requires Node v24+. 519 + // 520 + // Run: node examples/channel-fanout/main.ts 521 + 522 + import { workers, channel, assign } from '../../src/index.ts'; 523 + import { generate, process } from './work.ts'; 524 + 525 + { 526 + using run = workers(); 527 + const ch = channel(generate(200)); 528 + const fanout = run.workers.map((w) => { 529 + return assign(w, process(ch)); 530 + }); 531 + const results: number[][] = await run(fanout); 532 + 533 + for (let i = 0; i < results.length; i++) { 534 + console.log(`Worker ${i}: processed ${results[i].length} items`); 535 + } 536 + 537 + const all = results.flat().sort((a, b) => a - b); 538 + console.log(`\nTotal: ${all.length} items, none lost, none duplicated`); 539 + } 540 + ``` 541 + 542 + - [ ] **Step 2: Run example** 543 + 544 + Run: `node --no-warnings examples/channel-fanout/main.ts` 545 + Expected: Prints worker counts and total, exits cleanly. 546 + 547 + - [ ] **Step 3: Commit** 548 + 549 + ```bash 550 + git add examples/channel-fanout/main.ts 551 + git commit -m "refactor: channel-fanout example uses assign() and run.workers" 552 + ``` 553 + 554 + --- 555 + 556 + ### Task 6: Update README 557 + 558 + **Files:** 559 + - Modify: `README.md` 560 + 561 + - [ ] **Step 1: Add per-worker dispatch docs** 562 + 563 + Read `README.md`. Find the `channel()` and Fan-out section. Update the fan-out example to use `assign()` and `run.workers`. 564 + 565 + Replace the existing fan-out code example: 566 + 567 + ```ts 568 + const ch = channel(generate(100)); 569 + 570 + { 571 + using run = workers(4); 572 + const [a, b, c, d] = await run([process(ch), process(ch), process(ch), process(ch)]); 573 + // Items distributed across workers — no duplicates, no gaps 574 + } 575 + ``` 576 + 577 + With: 578 + 579 + ```ts 580 + const ch = channel(generate(100)); 581 + 582 + { 583 + using run = workers(); 584 + const fanout = run.workers.map((w) => { 585 + return assign(w, process(ch)); 586 + }); 587 + const results = await run(fanout); 588 + // Items distributed across workers — no duplicates, no gaps 589 + } 590 + ``` 591 + 592 + Update the import line in the code block above it from `import { workers, channel, mo } from 'moroutine';` to `import { workers, channel, assign, mo } from 'moroutine';`. 593 + 594 + - [ ] **Step 2: Run type check** 595 + 596 + Run: `pnpm tsc --noEmit 2>&1` 597 + Expected: No errors. 598 + 599 + - [ ] **Step 3: Commit** 600 + 601 + ```bash 602 + git add README.md 603 + git commit -m "docs: update fan-out example to use assign() and run.workers" 604 + ```