Offload functions to worker threads with shared memory primitives for Node.js.
1import type { Worker } from 'node:worker_threads';
2import type { ChannelOptions } from './channel.ts';
3
4declare const resultBrand: unique symbol;
5
6/** An inert task descriptor. Carries the result type `T` and argument types `A`
7 * at the type level, but is not itself a `PromiseLike` or `AsyncIterable`.
8 * Dispatch via a {@link Runner} or `await` a live task returned by `mo()`. */
9export type Task<T = unknown, A extends unknown[] = unknown[]> = {
10 readonly uid: number;
11 readonly id: string;
12 readonly args: A;
13 worker?: WorkerHandle;
14 /** @internal Type brand for result type inference. Not present at runtime. */
15 readonly [resultBrand]?: T;
16};
17
18/** Resolves the dispatch return type: `AsyncIterable<U>` for streaming tasks, `Promise<T>` otherwise. */
19export type RunResult<T> = T extends AsyncIterable<infer U> ? AsyncIterable<U> : Promise<T>;
20
21type TaskResults<T extends Task<any>[]> = {
22 [K in keyof T]: T[K] extends Task<infer R> ? R : never;
23};
24
25/** A load balancing strategy for choosing which worker runs a task. */
26export interface Balancer {
27 /** Choose a worker for the given task. Called synchronously on every dispatch. */
28 select(workers: readonly WorkerHandle[], task: Task): WorkerHandle;
29 /** Optional cleanup on sync dispose. */
30 [Symbol.dispose]?(): void;
31 /** Optional cleanup on async dispose. */
32 [Symbol.asyncDispose]?(): Promise<void>;
33}
34
35/** Options for configuring a worker pool. */
36export interface WorkerOptions {
37 /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */
38 shutdownTimeout?: number;
39 /** Load balancing strategy. Defaults to round-robin. */
40 balance?: Balancer;
41}
42
43/** A handle to a specific worker in a pool. */
44export interface WorkerHandle {
45 /** Dispatches a task pinned to this worker. Returns `AsyncIterable<T>` for streaming tasks, `Promise<T>` otherwise. */
46 exec<T>(task: Task<T>, opts?: ChannelOptions): RunResult<T>;
47 /** The underlying worker thread. */
48 readonly thread: Worker;
49 /** Number of currently in-flight tasks on this worker. */
50 readonly activeCount: number;
51}
52
53/**
54 * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`.
55 *
56 * @param task - A single {@link Task} to run on a worker.
57 * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task.
58 */
59export type Runner = {
60 /** Dispatches a batch of tasks in parallel and returns all results. */
61 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>;
62 /** Dispatches a task. Returns `AsyncIterable<T>` for streaming tasks, `Promise<T>` otherwise. */
63 <T>(task: Task<T>, opts?: ChannelOptions): RunResult<T>;
64 /** AbortSignal that fires when the pool starts disposing. */
65 readonly signal: AbortSignal;
66 /** Read-only array of worker handles, one per pool worker. */
67 readonly workers: readonly WorkerHandle[];
68 /** Terminates all workers immediately. */
69 [Symbol.dispose](): void;
70 /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */
71 [Symbol.asyncDispose](): Promise<void>;
72};