···11+# Async Worker Cleanup
22+33+## Goal
44+55+Support graceful async shutdown of worker pools via `await using`, with an abort signal that tasks can opt into for cooperative cancellation.
66+77+## Runner Changes
88+99+### New properties
1010+1111+- `run.signal: AbortSignal` — fires when the pool starts disposing (either sync or async path). Tasks can thread this signal to support cooperative cancellation.
1212+- `run[Symbol.asyncDispose](): Promise<void>` — graceful shutdown: aborts signal, waits for in-flight tasks to settle, then terminates workers.
1313+1414+### Updated type
1515+1616+```ts
1717+type Runner = {
1818+ <T>(task: Task<T>): Promise<T>;
1919+ <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>;
2020+ <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>;
2121+ readonly signal: AbortSignal;
2222+ [Symbol.dispose](): void;
2323+ [Symbol.asyncDispose](): Promise<void>;
2424+};
2525+```
2626+2727+### `workers()` signature
2828+2929+```ts
3030+workers(size?: number, opts?: { shutdownTimeout?: number }): Runner
3131+```
3232+3333+`shutdownTimeout` controls the maximum time `asyncDispose` waits for in-flight tasks before force-terminating. No default — omitting it means wait indefinitely.
3434+3535+## Dispose Behavior
3636+3737+| | `Symbol.dispose` (sync) | `Symbol.asyncDispose` (async) |
3838+|------------------|----------------------------|---------------------------------------|
3939+| Abort signal | fires | fires |
4040+| In-flight tasks | abandoned | awaited via `Promise.allSettled` |
4141+| New tasks | rejected | rejected |
4242+| Workers | terminated immediately | terminated after tasks settle |
4343+| Timeout | n/a | force-terminates after `shutdownTimeout` ms |
4444+4545+Both paths set `disposed = true` immediately so new task submissions are rejected.
4646+4747+## In-Flight Tracking
4848+4949+The pool owns a `Set<Promise<unknown>>` of active dispatches.
5050+5151+**Regular tasks:** `dispatch()` wraps the `execute()` promise — added to the set on dispatch, removed on settle (resolve or reject).
5252+5353+**Streaming tasks:** The promise represents the full lifecycle of the stream, not just dispatch. It resolves when the consumer finishes iterating (natural completion, `return()`, or error). `dispatchStream()` needs to expose a "done" promise for this.
5454+5555+**Batch tasks:** `run([task1, task2])` — each individual task in the batch is tracked separately. The batch `Promise.all` is not separately tracked.
5656+5757+## Async Dispose Flow
5858+5959+1. Set `disposed = true` (reject new tasks from here)
6060+2. Fire abort signal (`ac.abort()`)
6161+3. `await Promise.allSettled(inflightSet)`
6262+4. If `shutdownTimeout` is set, race the allSettled with a timeout. If timeout wins, proceed to terminate.
6363+5. Terminate all workers
6464+6. Clear pool
6565+6666+## Files Changed
6767+6868+- `src/runner.ts` — update Runner type with `signal`, `Symbol.asyncDispose`
6969+- `src/worker-pool.ts` — AbortController ownership, in-flight set, async dispose logic, shutdown timeout
7070+- `src/execute.ts` — `dispatchStream` returns a "done" promise alongside the AsyncIterable for stream lifecycle tracking
7171+- Tests for: async dispose waits for tasks, signal fires on dispose, sync dispose still works, timeout force-terminates, rejected tasks after dispose, streaming task lifecycle tracking
7272+7373+## Usage
7474+7575+```ts
7676+// Sync dispose — immediate termination (existing behavior)
7777+{
7878+ using run = workers(4);
7979+ await run(someTask());
8080+}
8181+8282+// Async dispose — graceful shutdown
8383+{
8484+ await using run = workers(4);
8585+ run(longTask(run.signal)); // task can react to abort
8686+ run(otherTask()); // runs to completion
8787+}
8888+// signal aborted, waited for both tasks, then terminated
8989+9090+// With timeout
9191+{
9292+ await using run = workers(4, { shutdownTimeout: 5000 });
9393+ run(longTask(run.signal));
9494+}
9595+// if tasks don't finish within 5s, force-terminate
9696+```