Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 72 lines 3.2 kB view raw
1import type { Worker } from 'node:worker_threads'; 2import type { ChannelOptions } from './channel.ts'; 3 4declare const resultBrand: unique symbol; 5 6/** An inert task descriptor. Carries the result type `T` and argument types `A` 7 * at the type level, but is not itself a `PromiseLike` or `AsyncIterable`. 8 * Dispatch via a {@link Runner} or `await` a live task returned by `mo()`. */ 9export type Task<T = unknown, A extends unknown[] = unknown[]> = { 10 readonly uid: number; 11 readonly id: string; 12 readonly args: A; 13 worker?: WorkerHandle; 14 /** @internal Type brand for result type inference. Not present at runtime. */ 15 readonly [resultBrand]?: T; 16}; 17 18/** Resolves the dispatch return type: `AsyncIterable<U>` for streaming tasks, `Promise<T>` otherwise. */ 19export type RunResult<T> = T extends AsyncIterable<infer U> ? AsyncIterable<U> : Promise<T>; 20 21type TaskResults<T extends Task<any>[]> = { 22 [K in keyof T]: T[K] extends Task<infer R> ? R : never; 23}; 24 25/** A load balancing strategy for choosing which worker runs a task. */ 26export interface Balancer { 27 /** Choose a worker for the given task. Called synchronously on every dispatch. */ 28 select(workers: readonly WorkerHandle[], task: Task): WorkerHandle; 29 /** Optional cleanup on sync dispose. */ 30 [Symbol.dispose]?(): void; 31 /** Optional cleanup on async dispose. */ 32 [Symbol.asyncDispose]?(): Promise<void>; 33} 34 35/** Options for configuring a worker pool. */ 36export interface WorkerOptions { 37 /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */ 38 shutdownTimeout?: number; 39 /** Load balancing strategy. Defaults to round-robin. */ 40 balance?: Balancer; 41} 42 43/** A handle to a specific worker in a pool. */ 44export interface WorkerHandle { 45 /** Dispatches a task pinned to this worker. Returns `AsyncIterable<T>` for streaming tasks, `Promise<T>` otherwise. */ 46 exec<T>(task: Task<T>, opts?: ChannelOptions): RunResult<T>; 47 /** The underlying worker thread. */ 48 readonly thread: Worker; 49 /** Number of currently in-flight tasks on this worker. */ 50 readonly activeCount: number; 51} 52 53/** 54 * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 55 * 56 * @param task - A single {@link Task} to run on a worker. 57 * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 58 */ 59export type Runner = { 60 /** Dispatches a batch of tasks in parallel and returns all results. */ 61 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 62 /** Dispatches a task. Returns `AsyncIterable<T>` for streaming tasks, `Promise<T>` otherwise. */ 63 <T>(task: Task<T>, opts?: ChannelOptions): RunResult<T>; 64 /** AbortSignal that fires when the pool starts disposing. */ 65 readonly signal: AbortSignal; 66 /** Read-only array of worker handles, one per pool worker. */ 67 readonly workers: readonly WorkerHandle[]; 68 /** Terminates all workers immediately. */ 69 [Symbol.dispose](): void; 70 /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */ 71 [Symbol.asyncDispose](): Promise<void>; 72};