Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: add streaming/channels to README, TSDoc on public interfaces

- Add Streaming section to README covering streaming moroutines,
channel() fan-out, and pipeline chaining
- Add @param/@returns TSDoc to mo(), channel(), Runner, Task,
StreamTask, ChannelOptions, and transfer()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+113 -4
+84
README.md
··· 219 219 console.log(pos.load()); // { x: 1, y: 1 } 220 220 ``` 221 221 222 + ## Streaming 223 + 224 + ### Streaming Moroutines 225 + 226 + Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with pause/resume backpressure. 227 + 228 + ```ts 229 + // count.ts 230 + import { mo } from 'moroutine'; 231 + 232 + export const countUp = mo(import.meta, async function* (n: number) { 233 + for (let i = 0; i < n; i++) { 234 + yield i; 235 + } 236 + }); 237 + ``` 238 + 239 + Iterate directly (dedicated worker) or dispatch via a pool: 240 + 241 + ```ts 242 + import { workers } from 'moroutine'; 243 + import { countUp } from './count.ts'; 244 + 245 + // Dedicated worker 246 + for await (const n of countUp(5)) { 247 + console.log(n); // 0, 1, 2, 3, 4 248 + } 249 + 250 + // Worker pool 251 + { 252 + using run = workers(2); 253 + for await (const n of run(countUp(5))) { 254 + console.log(n); // 0, 1, 2, 3, 4 255 + } 256 + } 257 + ``` 258 + 259 + ### `channel()` and Fan-out 260 + 261 + When you pass the same `AsyncIterable` or `StreamTask` argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing). 262 + 263 + ```ts 264 + import { workers, channel, mo } from 'moroutine'; 265 + 266 + const generate = mo(import.meta, async function* (n: number) { 267 + for (let i = 0; i < n; i++) yield i; 268 + }); 269 + 270 + const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 271 + const items: number[] = []; 272 + for await (const n of input) items.push(n); 273 + return items; 274 + }); 275 + ``` 276 + 277 + ```ts 278 + const data = channel(generate(100)); 279 + 280 + { 281 + using run = workers(4); 282 + const [a, b, c, d] = await run([ 283 + process(data), 284 + process(data), 285 + process(data), 286 + process(data), 287 + ]); 288 + // Items distributed across workers — no duplicates, no gaps 289 + } 290 + ``` 291 + 292 + Without `channel()`, `AsyncIterable` and `StreamTask` arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. 293 + 294 + ### Pipelines 295 + 296 + Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. 297 + 298 + ```ts 299 + const doubled = double(generate(5)); 300 + const squared = square(doubled); 301 + for await (const n of squared) { 302 + console.log(n); 303 + } 304 + ``` 305 + 222 306 ## Transfers 223 307 224 308 Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams.
+2
src/channel.ts
··· 7 7 8 8 const CHANNEL = Symbol.for('moroutine.channel'); 9 9 10 + /** Options for configuring a channel. */ 10 11 export interface ChannelOptions { 12 + /** Maximum number of items buffered before signaling backpressure. Defaults to 16. */ 11 13 highWaterMark?: number; 12 14 } 13 15
+2 -1
src/mo.ts
··· 8 8 * Wraps a function to run on a worker thread. Must be called at module scope. 9 9 * @param importMeta - The `import.meta` of the calling module, used to identify the source file. 10 10 * @param fn - The function to offload to a worker thread. 11 - * @returns A function that creates a {@link Task} when called. 11 + * @returns A callable that creates a {@link Task} (or {@link StreamTask} for `async function*`) when invoked. 12 12 */ 13 + 13 14 /** A value or a Task that resolves to that value on the worker. */ 14 15 export type Arg<T> = T | Task<T>; 15 16
+10 -1
src/runner.ts
··· 4 4 5 5 type TaskResults<T extends Task<any>[]> = { [K in keyof T]: T[K] extends Task<infer R> ? R : never }; 6 6 7 - /** A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. */ 7 + /** 8 + * A callable that dispatches tasks to a worker pool. Disposable via `using` or `[Symbol.dispose]()`. 9 + * 10 + * @param task - A single {@link Task} to run on a worker. 11 + * @returns `Promise<T>` for a single task, `Promise<[...results]>` for a batch, or `AsyncIterable<T>` for a streaming task. 12 + */ 8 13 export type Runner = { 14 + /** Dispatches a single task and returns its result. */ 9 15 <T>(task: Task<T>): Promise<T>; 16 + /** Dispatches a batch of tasks in parallel and returns all results. */ 10 17 <T extends Task<any>[]>(tasks: [...T]): Promise<TaskResults<T>>; 18 + /** Dispatches a streaming task and returns an async iterable of yielded values. */ 11 19 <T>(task: StreamTask<T>, opts?: ChannelOptions): AsyncIterable<T>; 20 + /** Terminates all workers in the pool. */ 12 21 [Symbol.dispose](): void; 13 22 };
+8 -1
src/stream-task.ts
··· 2 2 3 3 let nextUid = 0; 4 4 5 - /** A deferred streaming computation. When dispatched, returns an AsyncIterable instead of a Promise. */ 5 + /** 6 + * A deferred streaming computation. When dispatched via a {@link Runner} or iterated directly, 7 + * returns an `AsyncIterable` of yielded values instead of a `Promise`. 8 + * Created by calling an `async function*` wrapped with {@link mo}. 9 + */ 6 10 export class StreamTask<T> { 7 11 readonly uid: number; 8 12 readonly id: string; 9 13 readonly args: unknown[]; 10 14 15 + /** @param id - The moroutine identifier (module URL + index). 16 + * @param args - The arguments to pass to the worker generator function. */ 11 17 constructor(id: string, args: unknown[]) { 12 18 this.uid = nextUid++; 13 19 this.id = id; 14 20 this.args = args; 15 21 } 16 22 23 + /** Enables `for await...of` by dispatching to a dedicated worker. @returns An iterator of yielded values. */ 17 24 [Symbol.asyncIterator](): AsyncIterator<T> { 18 25 const iterable = runStreamOnDedicated<T>(this.id, this.args); 19 26 return iterable[Symbol.asyncIterator]();
+7 -1
src/task.ts
··· 2 2 3 3 let nextUid = 0; 4 4 5 - /** A deferred computation that runs on a worker thread when awaited. */ 5 + /** 6 + * A deferred computation that runs on a worker thread when awaited or dispatched via a {@link Runner}. 7 + * Created by calling a function wrapped with {@link mo}. 8 + */ 6 9 export class Task<T> { 7 10 readonly uid: number; 8 11 readonly id: string; 9 12 readonly args: unknown[]; 10 13 14 + /** @param id - The moroutine identifier (module URL + index). 15 + * @param args - The arguments to pass to the worker function. */ 11 16 constructor(id: string, args: unknown[]) { 12 17 this.uid = nextUid++; 13 18 this.id = id; 14 19 this.args = args; 15 20 } 16 21 22 + /** Enables `await task` by dispatching to a dedicated worker. @returns The worker function's result. */ 17 23 then<T1 = T, T2 = never>( 18 24 onfulfilled?: ((value: T) => T1 | PromiseLike<T1>) | null, 19 25 onrejected?: ((reason: any) => T2 | PromiseLike<T2>) | null,