Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: async worker cleanup implementation plan

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+479
+479
docs/superpowers/plans/2026-04-12-async-worker-cleanup.md
··· 1 + # Async Worker Cleanup Implementation Plan 2 + 3 + > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. 4 + 5 + **Goal:** Support graceful async shutdown of worker pools via `await using`, with a pool-owned AbortSignal for cooperative cancellation and optional shutdown timeout. 6 + 7 + **Architecture:** The pool owns an `AbortController` and a `Set<Promise>` of in-flight tasks. `Symbol.asyncDispose` aborts the signal, awaits the in-flight set (raced against an optional timeout), then terminates workers. `Symbol.dispose` aborts the signal and terminates immediately. `dispatchStream` is updated to return a "done" promise alongside the AsyncIterable so streams can be tracked. 8 + 9 + **Tech Stack:** TypeScript (erasable syntax only), worker_threads, node:test. 10 + 11 + --- 12 + 13 + ### Task 1: Update Runner Type 14 + 15 + Add `signal` and `Symbol.asyncDispose` to the Runner type, and add `WorkerOptions` for the shutdown timeout. 16 + 17 + **Files:** 18 + - Modify: `src/runner.ts` 19 + 20 + - [ ] **Step 1: Update Runner type and add WorkerOptions** 21 + 22 + ```ts 23 + // src/runner.ts 24 + import type { Task } from './task.ts'; 25 + import type { StreamTask } from './stream-task.ts'; 26 + import type { ChannelOptions } from './channel.ts'; 27 + 28 + type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 29 + 30 + /** Options for configuring a worker pool. */ 31 + export interface WorkerOptions { 32 + /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */ 33 + shutdownTimeout?: number; 34 + } 35 + 36 + /** 37 + * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 38 + * 39 + * @param task - A single {@link Task} to run on a worker. 40 + * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 41 + */ 42 + export type Runner = { 43 + /** Dispatches a single task and returns its result. */ 44 + <T>(task: Task<T>): Promise<T>; 45 + /** Dispatches a batch of tasks in parallel and returns all results. */ 46 + <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 47 + /** Dispatches a streaming task and returns an async iterable of yielded values. */ 48 + <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 49 + /** AbortSignal that fires when the pool starts disposing. */ 50 + readonly signal: AbortSignal; 51 + /** Terminates all workers immediately. */ 52 + [Symbol.dispose](): void; 53 + /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */ 54 + [Symbol.asyncDispose](): Promise<void>; 55 + }; 56 + ``` 57 + 58 + - [ ] **Step 2: Run type check** 59 + 60 + Run: `pnpm tsc --noEmit 2>&1` 61 + Expected: No errors (Runner type is wider now, worker-pool.ts doesn't satisfy it yet — but it's cast via `Object.assign` so it should still pass). 62 + 63 + - [ ] **Step 3: Commit** 64 + 65 + ```bash 66 + git add src/runner.ts 67 + git commit -m "feat: add signal, asyncDispose, and WorkerOptions to Runner type" 68 + ``` 69 + 70 + --- 71 + 72 + ### Task 2: Export WorkerOptions 73 + 74 + Export the new type from the package index. 75 + 76 + **Files:** 77 + - Modify: `src/index.ts` 78 + 79 + - [ ] **Step 1: Add WorkerOptions export** 80 + 81 + Add to `src/index.ts`: 82 + ```ts 83 + export type { Runner, WorkerOptions } from './runner.ts'; 84 + ``` 85 + 86 + Replace the existing `export type { Runner } from './runner.ts';` line. 87 + 88 + - [ ] **Step 2: Run type check** 89 + 90 + Run: `pnpm tsc --noEmit 2>&1` 91 + Expected: No errors. 92 + 93 + - [ ] **Step 3: Commit** 94 + 95 + ```bash 96 + git add src/index.ts 97 + git commit -m "feat: export WorkerOptions type" 98 + ``` 99 + 100 + --- 101 + 102 + ### Task 3: Stream Lifecycle Tracking in execute.ts 103 + 104 + Update `dispatchStream` to return both the `AsyncIterable` and a `Promise` that resolves when the stream finishes (natural completion, `return()`, or error). This is needed so the pool can track when streaming tasks are done. 105 + 106 + **Files:** 107 + - Modify: `src/execute.ts` 108 + 109 + - [ ] **Step 1: Change dispatchStream return type** 110 + 111 + Read `src/execute.ts`. Change `dispatchStream` to return an object with both the iterable and a done promise. 112 + 113 + The current signature: 114 + ```ts 115 + export function dispatchStream<T>(worker: Worker, id: string, args: unknown[], opts?: ChannelOptions): AsyncIterable<T> 116 + ``` 117 + 118 + Change to: 119 + ```ts 120 + export interface StreamDispatch<T> { 121 + iterable: AsyncIterable<T>; 122 + done: Promise<void>; 123 + } 124 + 125 + export function dispatchStream<T>(worker: Worker, id: string, args: unknown[], opts?: ChannelOptions): StreamDispatch<T> 126 + ``` 127 + 128 + In the function body, create a deferred promise. The existing `return { [Symbol.asyncIterator]() { ... } }` block should be captured as the `iterable`. The `done` promise resolves when the stream ends. 129 + 130 + At the top of the function, add: 131 + ```ts 132 + let resolveDone: () => void; 133 + const done = new Promise<void>((r) => { resolveDone = r; }); 134 + ``` 135 + 136 + In the existing `port1.on('message', ...)` handler, when `msg.done` or `msg.error` is received (the stream has ended from the worker side), call `resolveDone()`. 137 + 138 + In the `return()` method of the async iterator (consumer breaks early), also call `resolveDone()`. 139 + 140 + Return `{ iterable, done }` instead of just the iterable. 141 + 142 + - [ ] **Step 2: Update worker-pool.ts to use StreamDispatch** 143 + 144 + Read `src/worker-pool.ts`. In the `run` callable, the StreamTask branch currently returns `dispatchStream(...)` directly (which was the AsyncIterable). Update it to destructure: 145 + 146 + ```ts 147 + if (taskOrTasks instanceof StreamTask) { 148 + if (disposed) throw new Error('Worker pool is disposed'); 149 + const worker = pool[next % pool.length]; 150 + next++; 151 + const { iterable } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, opts); 152 + return iterable; 153 + } 154 + ``` 155 + 156 + For now, ignore the `done` promise — Task 4 will use it. 157 + 158 + - [ ] **Step 3: Run tests** 159 + 160 + Run: `pnpm test 2>&1 | tail -10` 161 + Expected: All tests pass (behavior unchanged, just return type restructured). 162 + 163 + - [ ] **Step 4: Run type check** 164 + 165 + Run: `pnpm tsc --noEmit 2>&1` 166 + Expected: No errors. 167 + 168 + - [ ] **Step 5: Commit** 169 + 170 + ```bash 171 + git add src/execute.ts src/worker-pool.ts 172 + git commit -m "refactor: dispatchStream returns StreamDispatch with done promise" 173 + ``` 174 + 175 + --- 176 + 177 + ### Task 4: Implement Async Dispose in worker-pool.ts 178 + 179 + Add the AbortController, in-flight tracking set, and both dispose paths. 180 + 181 + **Files:** 182 + - Modify: `src/worker-pool.ts` 183 + 184 + - [ ] **Step 1: Write failing tests** 185 + 186 + Create `test/async-dispose.test.ts`: 187 + 188 + ```ts 189 + // test/async-dispose.test.ts 190 + import { describe, it } from 'node:test'; 191 + import assert from 'node:assert/strict'; 192 + import { workers } from 'moroutine'; 193 + import { mo } from 'moroutine'; 194 + 195 + const slowTask = mo(import.meta, async (ms: number): Promise<string> => { 196 + await new Promise((r) => setTimeout(r, ms)); 197 + return 'done'; 198 + }); 199 + 200 + const waitForAbort = mo(import.meta, (signal: AbortSignal): Promise<string> => { 201 + return new Promise((resolve) => { 202 + if (signal.aborted) { resolve('aborted'); return; } 203 + signal.addEventListener('abort', () => resolve('aborted')); 204 + }); 205 + }); 206 + 207 + describe('async dispose', () => { 208 + it('waits for in-flight tasks to complete', async () => { 209 + const run = workers(1); 210 + const promise = run(slowTask(100)); 211 + await run[Symbol.asyncDispose](); 212 + const result = await promise; 213 + assert.equal(result, 'done'); 214 + }); 215 + 216 + it('fires signal on async dispose', async () => { 217 + const run = workers(1); 218 + const promise = run(waitForAbort(run.signal)); 219 + await run[Symbol.asyncDispose](); 220 + const result = await promise; 221 + assert.equal(result, 'aborted'); 222 + }); 223 + 224 + it('fires signal on sync dispose', async () => { 225 + const run = workers(1); 226 + assert.equal(run.signal.aborted, false); 227 + run[Symbol.dispose](); 228 + assert.equal(run.signal.aborted, true); 229 + }); 230 + 231 + it('rejects new tasks after async dispose starts', async () => { 232 + const run = workers(1); 233 + await run[Symbol.asyncDispose](); 234 + await assert.rejects(() => run(slowTask(10)), { message: /disposed/ }); 235 + }); 236 + 237 + it('force-terminates after shutdownTimeout', async () => { 238 + const run = workers(1, { shutdownTimeout: 50 }); 239 + run(slowTask(10_000)); // will not finish in time 240 + const start = performance.now(); 241 + await run[Symbol.asyncDispose](); 242 + const elapsed = performance.now() - start; 243 + assert.ok(elapsed < 500, `Expected fast teardown, took ${elapsed}ms`); 244 + }); 245 + }); 246 + ``` 247 + 248 + - [ ] **Step 2: Run tests to verify they fail** 249 + 250 + Run: `node --experimental-strip-types --no-warnings --test test/async-dispose.test.ts 2>&1 | tail -15` 251 + Expected: FAIL — `run.signal` and `run[Symbol.asyncDispose]` don't exist yet. 252 + 253 + - [ ] **Step 3: Implement async dispose in worker-pool.ts** 254 + 255 + Read `src/worker-pool.ts`. Update the `workers()` function: 256 + 257 + ```ts 258 + import { Worker } from 'node:worker_threads'; 259 + import { availableParallelism } from 'node:os'; 260 + import { setupWorker, execute, dispatchStream } from './execute.ts'; 261 + import type { Task } from './task.ts'; 262 + import { StreamTask } from './stream-task.ts'; 263 + import type { ChannelOptions } from './channel.ts'; 264 + import type { Runner, WorkerOptions } from './runner.ts'; 265 + 266 + const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 267 + 268 + /** 269 + * Creates a pool of worker threads that dispatch tasks with round-robin scheduling. 270 + * @param size - Number of worker threads in the pool. Defaults to the number of available CPUs. 271 + * @param opts - Optional configuration including shutdown timeout. 272 + * @returns A disposable {@link Runner} for dispatching tasks. 273 + */ 274 + export function workers(size: number = availableParallelism(), opts?: WorkerOptions): Runner { 275 + const pool: Worker[] = []; 276 + for (let i = 0; i < size; i++) { 277 + const worker = new Worker(workerEntryUrl); 278 + setupWorker(worker); 279 + worker.unref(); 280 + pool.push(worker); 281 + } 282 + 283 + let next = 0; 284 + let disposed = false; 285 + const ac = new AbortController(); 286 + const inflight = new Set<Promise<unknown>>(); 287 + 288 + function track<T>(promise: Promise<T>): Promise<T> { 289 + inflight.add(promise); 290 + promise.finally(() => inflight.delete(promise)); 291 + return promise; 292 + } 293 + 294 + function terminateAll(): void { 295 + for (const worker of pool) { 296 + worker.terminate(); 297 + } 298 + pool.length = 0; 299 + } 300 + 301 + function dispatch<T>(task: Task<T>): Promise<T> { 302 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 303 + const worker = pool[next % pool.length]; 304 + next++; 305 + return track(execute<T>(worker, task.id, task.args)); 306 + } 307 + 308 + const run: Runner = Object.assign( 309 + (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 310 + if (taskOrTasks instanceof StreamTask) { 311 + if (disposed) throw new Error('Worker pool is disposed'); 312 + const worker = pool[next % pool.length]; 313 + next++; 314 + const { iterable, done } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, channelOpts); 315 + track(done); 316 + return iterable; 317 + } 318 + if (Array.isArray(taskOrTasks)) { 319 + return Promise.all(taskOrTasks.map((t) => dispatch(t))); 320 + } 321 + return dispatch(taskOrTasks); 322 + }, 323 + { 324 + get signal() { 325 + return ac.signal; 326 + }, 327 + [Symbol.dispose]() { 328 + disposed = true; 329 + ac.abort(); 330 + terminateAll(); 331 + }, 332 + async [Symbol.asyncDispose]() { 333 + disposed = true; 334 + ac.abort(); 335 + const settle = Promise.allSettled(inflight); 336 + if (opts?.shutdownTimeout != null) { 337 + await Promise.race([ 338 + settle, 339 + new Promise<void>((r) => setTimeout(r, opts.shutdownTimeout)), 340 + ]); 341 + } else { 342 + await settle; 343 + } 344 + terminateAll(); 345 + }, 346 + }, 347 + ); 348 + 349 + return run; 350 + } 351 + ``` 352 + 353 + - [ ] **Step 4: Run tests** 354 + 355 + Run: `node --experimental-strip-types --no-warnings --test test/async-dispose.test.ts 2>&1 | tail -15` 356 + Expected: All 5 tests pass. 357 + 358 + Run: `pnpm test 2>&1 | tail -10` 359 + Expected: All existing tests still pass. 360 + 361 + - [ ] **Step 5: Run type check** 362 + 363 + Run: `pnpm tsc --noEmit 2>&1` 364 + Expected: No errors. 365 + 366 + - [ ] **Step 6: Commit** 367 + 368 + ```bash 369 + git add src/worker-pool.ts test/async-dispose.test.ts 370 + git commit -m "feat: async dispose with signal, in-flight tracking, and shutdown timeout" 371 + ``` 372 + 373 + --- 374 + 375 + ### Task 5: Streaming Async Dispose Test 376 + 377 + Verify that async dispose waits for streaming tasks to finish iterating. 378 + 379 + **Files:** 380 + - Modify: `test/async-dispose.test.ts` 381 + 382 + - [ ] **Step 1: Add streaming dispose test** 383 + 384 + Add to `test/async-dispose.test.ts`, with a new fixture at the top of the file: 385 + 386 + ```ts 387 + const slowStream = mo(import.meta, async function* (n: number) { 388 + for (let i = 0; i < n; i++) { 389 + await new Promise((r) => setTimeout(r, 20)); 390 + yield i; 391 + } 392 + }); 393 + ``` 394 + 395 + Add the test inside the existing `describe`: 396 + 397 + ```ts 398 + it('waits for streaming task to finish', async () => { 399 + const run = workers(1); 400 + const results: number[] = []; 401 + const iterating = (async () => { 402 + for await (const n of run(slowStream(5))) { 403 + results.push(n); 404 + } 405 + })(); 406 + // Let a couple items flow before disposing 407 + await new Promise((r) => setTimeout(r, 60)); 408 + await run[Symbol.asyncDispose](); 409 + await iterating; 410 + assert.equal(results.length, 5); 411 + }); 412 + ``` 413 + 414 + - [ ] **Step 2: Run test** 415 + 416 + Run: `node --experimental-strip-types --no-warnings --test test/async-dispose.test.ts 2>&1 | tail -15` 417 + Expected: All 6 tests pass. 418 + 419 + - [ ] **Step 3: Commit** 420 + 421 + ```bash 422 + git add test/async-dispose.test.ts 423 + git commit -m "test: verify async dispose waits for streaming tasks" 424 + ``` 425 + 426 + --- 427 + 428 + ### Task 6: Update TSDoc and README 429 + 430 + Document the new async dispose pattern and `run.signal`. 431 + 432 + **Files:** 433 + - Modify: `README.md` 434 + - Modify: `src/worker-pool.ts` (TSDoc already updated in Task 4) 435 + 436 + - [ ] **Step 1: Add to README** 437 + 438 + Read `README.md`. Find the `### \`workers(size)\`` section. Update it to document the new options and dispose patterns: 439 + 440 + After the existing `workers(size)` section content, add: 441 + 442 + ```markdown 443 + #### Graceful Shutdown 444 + 445 + Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation. 446 + 447 + \`\`\`ts 448 + { 449 + await using run = workers(4); 450 + 451 + run(longTask(run.signal)); // task can react to abort 452 + run(otherTask()); // runs to completion 453 + } 454 + // signal fired, waited for both tasks, then workers terminated 455 + \`\`\` 456 + 457 + Use `using` (without `await`) for immediate termination, same as before. 458 + 459 + A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long: 460 + 461 + \`\`\`ts 462 + { 463 + await using run = workers(4, { shutdownTimeout: 5000 }); 464 + // ... 465 + } 466 + \`\`\` 467 + ``` 468 + 469 + - [ ] **Step 2: Run type check** 470 + 471 + Run: `pnpm tsc --noEmit 2>&1` 472 + Expected: No errors. 473 + 474 + - [ ] **Step 3: Commit** 475 + 476 + ```bash 477 + git add README.md 478 + git commit -m "docs: document async dispose and run.signal in README" 479 + ```