Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: make Task<T, A> a pure descriptor; keep protocols on live tasks

Task<T, A> now carries result and arg types via brand symbols, without
PromiseLike or AsyncIterable protocols. Live tasks returned by mo()
intersect with the appropriate protocol for awaiting/iteration.

Enables accurate result-type inference for helpers like map() that
consume inert Task values — the bare descriptor can be yielded from
a generator or held in arrays without triggering auto-await.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+24 -27
+1 -1
src/index.ts
··· 8 8 export { transfer } from './transfer.ts'; 9 9 export { assign } from './assign.ts'; 10 10 export { roundRobin, leastBusy } from './balancers.ts'; 11 - export type { Task, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 11 + export type { Task, RunResult, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 12 12 export { 13 13 shared, 14 14 int8,
+4 -4
src/mo.ts
··· 18 18 type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> }; 19 19 20 20 type Moroutine<A extends unknown[], R> = { 21 - (...args: A): Task<Awaited<R>>; 22 - (...args: TaskableArgs<A>): Task<Awaited<R>>; 21 + (...args: A): Task<Awaited<R>, A> & PromiseLike<Awaited<R>>; 22 + (...args: TaskableArgs<A>): Task<Awaited<R>, A> & PromiseLike<Awaited<R>>; 23 23 }; 24 24 25 25 type AsyncIterableMoroutine<A extends unknown[], Y> = { 26 - (...args: A): Task<AsyncIterable<Y>>; 27 - (...args: TaskableArgs<A>): Task<AsyncIterable<Y>>; 26 + (...args: A): Task<AsyncIterable<Y>, A> & AsyncIterable<Y>; 27 + (...args: TaskableArgs<A>): Task<AsyncIterable<Y>, A> & AsyncIterable<Y>; 28 28 }; 29 29 30 30 type IsNever<T> = [T] extends [never] ? true : false;
+19 -22
src/runner.ts
··· 1 1 import type { Worker } from 'node:worker_threads'; 2 2 import type { ChannelOptions } from './channel.ts'; 3 3 4 - declare const __task__: unique symbol; 4 + declare const resultBrand: unique symbol; 5 + declare const argsBrand: unique symbol; 5 6 6 - /** A dispatchable task. The type parameter determines the dispatch mode: 7 - * - `Task<T>` (non-iterable T) is a `PromiseLike<T>` — await it for a single value. 8 - * - `Task<AsyncIterable<T>>` is an `AsyncIterable<T>` — iterate it for a stream. 9 - * - `Task` (no param) is the base shape shared by all tasks, useful for balancer signatures. */ 10 - export type Task<T = typeof __task__> = { 7 + /** An inert task descriptor. Carries the result type `T` and argument types `A` 8 + * at the type level, but is not itself a `PromiseLike` or `AsyncIterable`. 9 + * Dispatch via a {@link Runner} or `await` a live task returned by `mo()`. */ 10 + export type Task<T = unknown, A extends unknown[] = unknown[]> = { 11 11 readonly uid: number; 12 12 readonly id: string; 13 13 readonly args: unknown[]; 14 14 worker?: WorkerHandle; 15 - } & ([T] extends [never] 16 - ? PromiseLike<T> 17 - : [T] extends [typeof __task__] 18 - ? {} 19 - : T extends AsyncIterable<infer U> 20 - ? AsyncIterable<U> 21 - : PromiseLike<T>); 15 + /** @internal Type brand for result type inference. Not present at runtime. */ 16 + readonly [resultBrand]?: T; 17 + /** @internal Type brand for argument type inference. Not present at runtime. */ 18 + readonly [argsBrand]?: A; 19 + }; 20 + 21 + /** Resolves the dispatch return type: `AsyncIterable<U>` for streaming tasks, `Promise<T>` otherwise. */ 22 + export type RunResult<T> = T extends AsyncIterable<infer U> ? AsyncIterable<U> : Promise<T>; 22 23 23 24 type TaskResults<T extends Task<any>[]> = { 24 - [K in keyof T]: T[K] extends PromiseLike<infer R> ? R : never; 25 + [K in keyof T]: T[K] extends Task<infer R> ? R : never; 25 26 }; 26 27 27 28 /** A load balancing strategy for choosing which worker runs a task. */ ··· 44 45 45 46 /** A handle to a specific worker in a pool. */ 46 47 export interface WorkerHandle { 47 - /** Dispatches a streaming task pinned to this worker. */ 48 - exec<T>(task: Task<AsyncIterable<T>>, opts?: ChannelOptions): AsyncIterable<T>; 49 - /** Dispatches a task pinned to this worker. */ 50 - exec<T>(task: Task<T>): Promise<T>; 48 + /** Dispatches a task pinned to this worker. Returns `AsyncIterable<T>` for streaming tasks, `Promise<T>` otherwise. */ 49 + exec<T>(task: Task<T>, opts?: ChannelOptions): RunResult<T>; 51 50 /** The underlying worker thread. */ 52 51 readonly thread: Worker; 53 52 /** Number of currently in-flight tasks on this worker. */ ··· 61 60 * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 62 61 */ 63 62 export type Runner = { 64 - /** Dispatches a streaming task and returns an async iterable of yielded values. */ 65 - <T>(task: Task<AsyncIterable<T>>, opts?: ChannelOptions): AsyncIterable<T>; 66 - /** Dispatches a single task and returns its result. */ 67 - <T>(task: Task<T>): Promise<T>; 68 63 /** Dispatches a batch of tasks in parallel and returns all results. */ 69 64 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 65 + /** Dispatches a task. Returns `AsyncIterable<T>` for streaming tasks, `Promise<T>` otherwise. */ 66 + <T>(task: Task<T>, opts?: ChannelOptions): RunResult<T>; 70 67 /** AbortSignal that fires when the pool starts disposing. */ 71 68 readonly signal: AbortSignal; 72 69 /** Read-only array of worker handles, one per pool worker. */