Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add WorkerHandle type and workers property to Runner

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+32 -2
+1 -1
src/index.ts
··· 6 6 export type { ChannelOptions } from './channel.ts'; 7 7 export { workers } from './worker-pool.ts'; 8 8 export { transfer } from './transfer.ts'; 9 - export type { Runner, WorkerOptions } from './runner.ts'; 9 + export type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 10 10 export { 11 11 shared, 12 12 int8,
+10
src/runner.ts
··· 10 10 shutdownTimeout?: number; 11 11 } 12 12 13 + /** A handle to a specific worker in a pool. */ 14 + export interface WorkerHandle { 15 + /** Dispatches a task pinned to this worker. */ 16 + exec<T>(task: Task<T>): Promise<T>; 17 + /** Dispatches a streaming task pinned to this worker. */ 18 + exec<T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 19 + } 20 + 13 21 /** 14 22 * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 15 23 * ··· 25 33 <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 26 34 /** AbortSignal that fires when the pool starts disposing. */ 27 35 readonly signal: AbortSignal; 36 + /** Read-only array of worker handles, one per pool worker. */ 37 + readonly workers: readonly WorkerHandle[]; 28 38 /** Terminates all workers immediately. */ 29 39 [Symbol.dispose](): void; 30 40 /** Aborts signal, waits for in-flight tasks to settle, then terminates workers. */
+21 -1
src/worker-pool.ts
··· 5 5 import type { Task } from './task.ts'; 6 6 import { StreamTask } from './stream-task.ts'; 7 7 import type { ChannelOptions } from './channel.ts'; 8 - import type { Runner, WorkerOptions } from './runner.ts'; 8 + import type { Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 9 9 10 10 const workerEntryUrl = new URL('./worker-entry.ts', import.meta.url); 11 11 ··· 49 49 return track(execute<T>(worker, task.id, task.args)); 50 50 } 51 51 52 + function makeWorkerHandle(worker: Worker): WorkerHandle { 53 + return { 54 + exec<T>(task: Task<T> | StreamTask<T>, opts?: ChannelOptions): any { 55 + if (task instanceof StreamTask) { 56 + if (disposed) throw new Error('Worker pool is disposed'); 57 + const { iterable, done } = dispatchStream(worker, task.id, task.args, opts); 58 + track(done); 59 + return iterable; 60 + } 61 + if (disposed) return Promise.reject(new Error('Worker pool is disposed')); 62 + return track(execute<T>(worker, task.id, task.args)); 63 + }, 64 + }; 65 + } 66 + 67 + const workerHandles: readonly WorkerHandle[] = pool.map(makeWorkerHandle); 68 + 52 69 const run: Runner = Object.assign( 53 70 (taskOrTasks: Task<any> | Task<any>[] | StreamTask<any>, channelOpts?: ChannelOptions): any => { 54 71 if (taskOrTasks instanceof StreamTask) { ··· 67 84 { 68 85 get signal() { 69 86 return ac.signal; 87 + }, 88 + get workers() { 89 + return workerHandles; 70 90 }, 71 91 [Symbol.dispose]() { 72 92 disposed = true;