Offload functions to worker threads with shared memory primitives for Node.js.
1import type { Worker } from 'node:worker_threads';
2import type { ChannelOptions } from './channel.ts';
3
4declare const __task__: unique symbol;
5
6/** A dispatchable task. The type parameter determines the dispatch mode:
7 * - `Task<T>` (non-iterable T) is a `PromiseLike<T>` — await it for a single value.
8 * - `Task<AsyncIterable<T>>` is an `AsyncIterable<T>` — iterate it for a stream.
9 * - `Task` (no param) is the base shape shared by all tasks, useful for balancer signatures. */
10export type Task<T = typeof __task__> = {
11 readonly uid: number;
12 readonly id: string;
13 readonly args: unknown[];
14 worker?: WorkerHandle;
15} & ([T] extends [never]
16 ? PromiseLike<T>
17 : [T] extends [typeof __task__]
18 ? {}
19 : T extends AsyncIterable<infer U>
20 ? AsyncIterable<U>
21 : PromiseLike<T>);
22
23type TaskResults<T extends Task<any>[]> = {
24 [K in keyof T]: T[K] extends PromiseLike<infer R> ? R : never;
25};
26
27/** A load balancing strategy for choosing which worker runs a task. */
28export interface Balancer {
29 /** Choose a worker for the given task. Called synchronously on every dispatch. */
30 select(workers: readonly WorkerHandle[], task: Task): WorkerHandle;
31 /** Optional cleanup on sync dispose. */
32 [Symbol.dispose]?(): void;
33 /** Optional cleanup on async dispose. */
34 [Symbol.asyncDispose]?(): Promise<void>;
35}
36
37/** Options for configuring a worker pool. */
38export interface WorkerOptions {
39 /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */
40 shutdownTimeout?: number;
41 /** Load balancing strategy. Defaults to round-robin. */
42 balance?: Balancer;
43}
44
45/** A handle to a specific worker in a pool. */
46export interface WorkerHandle {
47 /** Dispatches a streaming task pinned to this worker. */
48 exec<T>(task: Task<AsyncIterable<T>>, opts?: ChannelOptions): AsyncIterable<T>;
49 /** Dispatches a task pinned to this worker. */
50 exec<T>(task: Task<T>): Promise<T>;
51 /** The underlying worker thread. */
52 readonly thread: Worker;
53 /** Number of currently in-flight tasks on this worker. */
54 readonly activeCount: number;
55}
56
57/**
58 * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`.
59 *
60 * @param task - A single {@link Task} to run on a worker.
61 * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task.
62 */
63export type Runner = {
64 /** Dispatches a streaming task and returns an async iterable of yielded values. */
65 <T>(task: Task<AsyncIterable<T>>, opts?: ChannelOptions): AsyncIterable<T>;
66 /** Dispatches a single task and returns its result. */
67 <T>(task: Task<T>): Promise<T>;
68 /** Dispatches a batch of tasks in parallel and returns all results. */
69 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>;
70 /** AbortSignal that fires when the pool starts disposing. */
71 readonly signal: AbortSignal;
72 /** Read-only array of worker handles, one per pool worker. */
73 readonly workers: readonly WorkerHandle[];
74 /** Terminates all workers immediately. */
75 [Symbol.dispose](): void;
76 /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */
77 [Symbol.asyncDispose](): Promise<void>;
78};