Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e46aca05dbf9a2204886d23f7ae3b34883f4e5c6 78 lines 3.2 kB view raw
1import type { Worker } from 'node:worker_threads'; 2import type { ChannelOptions } from './channel.ts'; 3 4declare const __task__: unique symbol; 5 6/** A dispatchable task. The type parameter determines the dispatch mode: 7 * - `Task<T>` (non-iterable T) is a `PromiseLike<T>` — await it for a single value. 8 * - `Task<AsyncIterable<T>>` is an `AsyncIterable<T>` — iterate it for a stream. 9 * - `Task` (no param) is the base shape shared by all tasks, useful for balancer signatures. */ 10export type Task<T = typeof __task__> = { 11 readonly uid: number; 12 readonly id: string; 13 readonly args: unknown[]; 14 worker?: WorkerHandle; 15} & ([T] extends [never] 16 ? PromiseLike<T> 17 : [T] extends [typeof __task__] 18 ? {} 19 : T extends AsyncIterable<infer U> 20 ? AsyncIterable<U> 21 : PromiseLike<T>); 22 23type TaskResults<T extends Task<any>[]> = { 24 [K in keyof T]: T[K] extends PromiseLike<infer R> ? R : never; 25}; 26 27/** A load balancing strategy for choosing which worker runs a task. */ 28export interface Balancer { 29 /** Choose a worker for the given task. Called synchronously on every dispatch. */ 30 select(workers: readonly WorkerHandle[], task: Task): WorkerHandle; 31 /** Optional cleanup on sync dispose. */ 32 [Symbol.dispose]?(): void; 33 /** Optional cleanup on async dispose. */ 34 [Symbol.asyncDispose]?(): Promise<void>; 35} 36 37/** Options for configuring a worker pool. */ 38export interface WorkerOptions { 39 /** Maximum time in ms to wait for in-flight tasks during async dispose. If exceeded, workers are force-terminated. */ 40 shutdownTimeout?: number; 41 /** Load balancing strategy. Defaults to round-robin. */ 42 balance?: Balancer; 43} 44 45/** A handle to a specific worker in a pool. */ 46export interface WorkerHandle { 47 /** Dispatches a streaming task pinned to this worker. */ 48 exec<T>(task: Task<AsyncIterable<T>>, opts?: ChannelOptions): AsyncIterable<T>; 49 /** Dispatches a task pinned to this worker. */ 50 exec<T>(task: Task<T>): Promise<T>; 51 /** The underlying worker thread. */ 52 readonly thread: Worker; 53 /** Number of currently in-flight tasks on this worker. */ 54 readonly activeCount: number; 55} 56 57/** 58 * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 59 * 60 * @param task - A single {@link Task} to run on a worker. 61 * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 62 */ 63export type Runner = { 64 /** Dispatches a streaming task and returns an async iterable of yielded values. */ 65 <T>(task: Task<AsyncIterable<T>>, opts?: ChannelOptions): AsyncIterable<T>; 66 /** Dispatches a single task and returns its result. */ 67 <T>(task: Task<T>): Promise<T>; 68 /** Dispatches a batch of tasks in parallel and returns all results. */ 69 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 70 /** AbortSignal that fires when the pool starts disposing. */ 71 readonly signal: AbortSignal; 72 /** Read-only array of worker handles, one per pool worker. */ 73 readonly workers: readonly WorkerHandle[]; 74 /** Terminates all workers immediately. */ 75 [Symbol.dispose](): void; 76 /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */ 77 [Symbol.asyncDispose](): Promise<void>; 78};