Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add inert() and map() helpers for bounded fan-out

- inert(task) returns a plain task descriptor without PromiseLike or
AsyncIterable protocols, safe to yield from an (async) generator
without triggering auto-await
- map(run, items, { concurrency, signal }) dispatches an iterable or
async iterable of tasks to a Runner with bounded concurrency, yielding
results in completion order
- Accepts mixed task types: Task<string> | Task<number> yields string | number
- Supports AbortSignal for stream cancellation; moroutine auto-transfers
signals passed as task args so in-flight work can observe the same abort

Includes test coverage, a bounded-map example that hashes a directory
tree via recursive async generator, and a README section.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+292
+10
.changeset/inert-and-map.md
··· 1 + --- 2 + 'moroutine': minor 3 + --- 4 + 5 + Add `inert()` and `map()` helpers for fan-out over a worker pool 6 + 7 + - `inert(task)` returns a plain task descriptor without `PromiseLike` or `AsyncIterable` protocols — safe to yield from an (async) generator without triggering auto-await 8 + - `map(run, items, { concurrency, signal })` dispatches an iterable or async iterable of tasks to a `Runner` with bounded concurrency, yielding results in completion order; accepts mixed task types (`Task<string> | Task<number>` → `string | number`) 9 + - New example: `examples/bounded-map` — recursive directory walk hashing every file with bounded concurrency 10 + - `Task<T, A>` now carries a type-only arg brand to enable accurate result inference through `map()`; live tasks returned by `mo()` continue to be `PromiseLike<T>` / `AsyncIterable<T>` as before
+1
.nvmrc
··· 1 + 24
+48
README.md
··· 389 389 390 390 Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. 391 391 392 + ### `map()` — Bounded Fan-out 393 + 394 + Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited. 395 + 396 + ```ts 397 + // main.ts 398 + import { readdir } from 'node:fs/promises'; 399 + import { join } from 'node:path'; 400 + import { workers, map, inert } from 'moroutine'; 401 + import type { Task } from 'moroutine'; 402 + import { hashFile, type FileHash } from './hash-file.ts'; 403 + 404 + { 405 + using run = workers(); 406 + for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) { 407 + console.log(`${hash.slice(0, 12)} ${path}`); 408 + } 409 + } 410 + 411 + async function* walk(dir: string): AsyncGenerator<Task<FileHash>> { 412 + for (const entry of await readdir(dir, { withFileTypes: true })) { 413 + const p = join(dir, entry.name); 414 + if (entry.isDirectory()) { 415 + yield* walk(p); 416 + } else { 417 + yield inert(hashFile(p)); 418 + } 419 + } 420 + } 421 + ``` 422 + 423 + ```ts 424 + // hash-file.ts 425 + import { readFile } from 'node:fs/promises'; 426 + import { createHash } from 'node:crypto'; 427 + import { mo } from 'moroutine'; 428 + 429 + export type FileHash = { path: string; hash: string }; 430 + 431 + export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => { 432 + const buf = await readFile(path); 433 + return { path, hash: createHash('sha256').update(buf).digest('hex') }; 434 + }); 435 + ``` 436 + 437 + `map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work. 438 + 392 439 ### Pipelines 393 440 394 441 Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. ··· 429 476 - [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching 430 477 - [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers 431 478 - [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing 479 + - [`examples/bounded-map`](examples/bounded-map) -- bounded-concurrency fan-out with `map()` and a shared abort signal 432 480 - [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks 433 481 - [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers
+10
examples/bounded-map/hash-file.ts
··· 1 + import { readFile } from 'node:fs/promises'; 2 + import { createHash } from 'node:crypto'; 3 + import { mo } from '../../src/index.ts'; 4 + 5 + export type FileHash = { path: string; hash: string }; 6 + 7 + export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => { 8 + const buf = await readFile(path); 9 + return { path, hash: createHash('sha256').update(buf).digest('hex') }; 10 + });
+35
examples/bounded-map/main.ts
··· 1 + // Walk a directory tree with a recursive async generator and hash every file 2 + // using map() with bounded concurrency. Results stream back in completion order. 3 + // Requires Node v24+. 4 + // 5 + // Run: node examples/bounded-map/main.ts [dir] 6 + 7 + import { readdir } from 'node:fs/promises'; 8 + import { join } from 'node:path'; 9 + import { workers, map, inert } from '../../src/index.ts'; 10 + import type { Task } from '../../src/index.ts'; 11 + import { hashFile, type FileHash } from './hash-file.ts'; 12 + 13 + const root = process.argv[2] ?? './src'; 14 + 15 + { 16 + using run = workers(); 17 + const start = performance.now(); 18 + let count = 0; 19 + for await (const { path, hash } of map(run, walk(root), { concurrency: 4 })) { 20 + console.log(`${hash.slice(0, 12)} ${path}`); 21 + count++; 22 + } 23 + console.log(`\nhashed ${count} files in ${(performance.now() - start).toFixed(0)}ms`); 24 + } 25 + 26 + async function* walk(dir: string): AsyncGenerator<Task<FileHash>> { 27 + for (const entry of await readdir(dir, { withFileTypes: true })) { 28 + const p = join(dir, entry.name); 29 + if (entry.isDirectory()) { 30 + yield* walk(p); 31 + } else { 32 + yield inert(hashFile(p)); 33 + } 34 + } 35 + }
+3
src/index.ts
··· 7 7 export { workers } from './worker-pool.ts'; 8 8 export { transfer } from './transfer.ts'; 9 9 export { assign } from './assign.ts'; 10 + export { inert } from './inert.ts'; 11 + export { map } from './map.ts'; 12 + export type { MapOptions } from './map.ts'; 10 13 export { roundRobin, leastBusy } from './balancers.ts'; 11 14 export type { Task, RunResult, Balancer, Runner, WorkerHandle, WorkerOptions } from './runner.ts'; 12 15 export {
+14
src/inert.ts
··· 1 + import type { Task } from './runner.ts'; 2 + 3 + /** 4 + * Returns an inert copy of a task — same `uid`, `id`, `args`, and `worker` 5 + * but without `PromiseLike` or `AsyncIterable` protocols. 6 + * Safe to `yield` from an async generator without triggering auto-await. 7 + * 8 + * @param task - A task created by a {@link mo}-wrapped function. 9 + * @returns A plain object with the same task identity, stripped of any thenable or iterable protocols. 10 + */ 11 + export function inert<T, A extends unknown[] = unknown[]>(task: Task<T, A>): Task<T, A> { 12 + const { uid, id, args, worker } = task; 13 + return { uid, id, args, worker }; 14 + }
+32
src/map.ts
··· 1 + import { Readable } from 'node:stream'; 2 + import type { Task, Runner } from './runner.ts'; 3 + 4 + type TaskResult<I> = I extends Task<infer T> ? T : never; 5 + 6 + /** Options controlling how {@link map} dispatches tasks to a {@link Runner}. */ 7 + export interface MapOptions { 8 + /** Maximum number of in-flight tasks at once. Defaults to `1`. */ 9 + concurrency?: number; 10 + /** Signal that, when aborted, stops pulling from `items` and ends iteration. */ 11 + signal?: AbortSignal; 12 + } 13 + 14 + /** 15 + * Dispatches tasks yielded by `items` to a worker pool with bounded concurrency, 16 + * emitting results in completion order. 17 + * 18 + * Accepts a union of task types (e.g. `Task<string> | Task<number>`) and emits 19 + * the corresponding union of results. 20 + * 21 + * @param run - A {@link Runner} returned by {@link workers}. 22 + * @param items - Iterable or async iterable of inert tasks to dispatch. Use {@link inert} to yield tasks from an async generator without triggering auto-await. 23 + * @param opts - Concurrency and cancellation options. 24 + * @returns Async iterable of task results in completion order. 25 + */ 26 + export function map<I extends Task<any>>( 27 + run: Runner, 28 + items: Iterable<I> | AsyncIterable<I>, 29 + opts?: MapOptions, 30 + ): AsyncIterable<TaskResult<I>> { 31 + return Readable.from(items).map(run, opts) as AsyncIterable<TaskResult<I>>; 32 + }
+17
test/fixtures/map.ts
··· 1 + import { setTimeout } from 'node:timers/promises'; 2 + import { mo } from 'moroutine'; 3 + 4 + export const delayedSquare = mo(import.meta, async (n: number, ms: number): Promise<number> => { 5 + await setTimeout(ms); 6 + return n * n; 7 + }); 8 + 9 + export const toUpper = mo(import.meta, (s: string): string => s.toUpperCase()); 10 + 11 + export const waitAborted = mo(import.meta, async (n: number, signal: AbortSignal): Promise<number> => { 12 + await new Promise<void>((resolve) => { 13 + if (signal.aborted) return resolve(); 14 + signal.addEventListener('abort', () => resolve()); 15 + }); 16 + return n; 17 + });
+122
test/map.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { inert, map, workers } from 'moroutine'; 4 + import { delayedSquare, toUpper, waitAborted } from './fixtures/map.ts'; 5 + 6 + describe('map', () => { 7 + it('dispatches tasks and yields results', async () => { 8 + using run = workers(2); 9 + async function* tasks() { 10 + for (const n of [1, 2, 3, 4]) yield inert(delayedSquare(n, 10)); 11 + } 12 + const results: number[] = []; 13 + for await (const r of map(run, tasks(), { concurrency: 2 })) { 14 + results.push(r); 15 + } 16 + assert.deepEqual( 17 + results.sort((a, b) => a - b), 18 + [1, 4, 9, 16], 19 + ); 20 + }); 21 + 22 + it('respects the concurrency limit', async () => { 23 + using run = workers(4); 24 + let active = 0; 25 + let peak = 0; 26 + async function* tasks() { 27 + for (let i = 0; i < 8; i++) { 28 + active++; 29 + peak = Math.max(peak, active); 30 + yield inert(delayedSquare(i, 30)); 31 + active--; 32 + } 33 + } 34 + const results: number[] = []; 35 + for await (const r of map(run, tasks(), { concurrency: 2 })) { 36 + results.push(r); 37 + } 38 + assert.equal(results.length, 8); 39 + assert.ok(peak <= 2, `expected peak in-flight <= 2, got ${peak}`); 40 + }); 41 + 42 + it('yields the union of result types for mixed tasks', async () => { 43 + using run = workers(2); 44 + async function* tasks() { 45 + yield inert(delayedSquare(3, 5)); 46 + yield inert(toUpper('hi')); 47 + yield inert(delayedSquare(4, 5)); 48 + yield inert(toUpper('there')); 49 + } 50 + const results: Array<string | number> = []; 51 + for await (const r of map(run, tasks(), { concurrency: 2 })) { 52 + results.push(r); 53 + } 54 + assert.equal(results.length, 4); 55 + assert.equal(results.filter((r) => typeof r === 'number').length, 2); 56 + assert.equal(results.filter((r) => typeof r === 'string').length, 2); 57 + }); 58 + 59 + it('stops iteration when signal aborts', async () => { 60 + using run = workers(2); 61 + const ac = new AbortController(); 62 + async function* tasks() { 63 + for (let i = 0; i < 100; i++) yield inert(waitAborted(i, ac.signal)); 64 + } 65 + setTimeout(() => ac.abort(), 30); 66 + const results: number[] = []; 67 + await assert.rejects( 68 + async () => { 69 + for await (const r of map(run, tasks(), { concurrency: 4, signal: ac.signal })) { 70 + results.push(r); 71 + } 72 + }, 73 + (err: Error) => err.name === 'AbortError', 74 + ); 75 + }); 76 + 77 + it('accepts a sync iterable of tasks', async () => { 78 + using run = workers(2); 79 + const tasks = [1, 2, 3].map((n) => inert(delayedSquare(n, 5))); 80 + const results: number[] = []; 81 + for await (const r of map(run, tasks, { concurrency: 2 })) { 82 + results.push(r); 83 + } 84 + assert.deepEqual( 85 + results.sort((a, b) => a - b), 86 + [1, 4, 9], 87 + ); 88 + }); 89 + 90 + it('accepts a sync generator of tasks', async () => { 91 + using run = workers(2); 92 + function* gen() { 93 + for (const n of [2, 4, 6]) yield inert(delayedSquare(n, 5)); 94 + } 95 + const results: number[] = []; 96 + for await (const r of map(run, gen(), { concurrency: 2 })) { 97 + results.push(r); 98 + } 99 + assert.deepEqual( 100 + results.sort((a, b) => a - b), 101 + [4, 16, 36], 102 + ); 103 + }); 104 + 105 + it('defaults concurrency to 1', async () => { 106 + using run = workers(2); 107 + const order: number[] = []; 108 + async function* tasks() { 109 + for (const n of [0, 1, 2]) { 110 + order.push(n); 111 + yield inert(delayedSquare(n, 20)); 112 + } 113 + } 114 + const results: number[] = []; 115 + for await (const r of map(run, tasks())) { 116 + results.push(r); 117 + } 118 + // With concurrency=1, generator shouldn't get ahead of consumption — all 3 produced sequentially 119 + assert.deepEqual(results, [0, 1, 4]); 120 + assert.deepEqual(order, [0, 1, 2]); 121 + }); 122 + });