Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: channel-fanout and benchmark examples, fix Moroutine Awaited type

- Add channel-fanout example demonstrating work-stealing fan-out
- Add benchmark example measuring roundtrip channel throughput
- Remove channel() from pipeline example (auto-detection handles it)
- Fix Moroutine type to use Awaited<R> so async tasks resolve correctly
- Fix channel-fanout test types to use destructuring instead of any[]
- Add new examples to README

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+107 -13
+3
README.md
··· 245 245 - [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker 246 246 - [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker 247 247 - [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching 248 + - [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers 249 + - [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing 250 + - [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers
+46
examples/benchmark/main.ts
··· 1 + // Measures roundtrip channel throughput (main → worker → main) with 2 + // 1 up to availableParallelism() workers. The worker is a passthrough 3 + // streaming moroutine that yields each item back unchanged. 4 + // Requires Node v24+. 5 + // 6 + // Run: node examples/benchmark/main.ts 7 + 8 + import { availableParallelism } from 'node:os'; 9 + import { workers, channel } from '../../src/index.ts'; 10 + import { passthrough } from './work.ts'; 11 + 12 + const ITEMS = 100_000; 13 + const MAX_WORKERS = availableParallelism(); 14 + 15 + async function bench(numWorkers: number): Promise<number> { 16 + async function* source() { 17 + for (let i = 0; i < ITEMS; i++) yield i; 18 + } 19 + 20 + const data = channel(source()); 21 + 22 + using run = workers(numWorkers); 23 + const tasks = Array.from({ length: numWorkers }, () => passthrough(data)); 24 + const streams = tasks.map((t) => run(t)); 25 + 26 + const start = performance.now(); 27 + let count = 0; 28 + await Promise.all( 29 + streams.map(async (s) => { 30 + for await (const _ of s) count++; 31 + }), 32 + ); 33 + const elapsed = performance.now() - start; 34 + 35 + return count / (elapsed / 1000); 36 + } 37 + 38 + console.log(`Roundtrip throughput: ${ITEMS.toLocaleString()} items, 1–${MAX_WORKERS} workers\n`); 39 + 40 + for (let n = 1; n <= MAX_WORKERS; n++) { 41 + const ips = await bench(n); 42 + const bar = '#'.repeat(Math.round(ips / 1000)); 43 + console.log( 44 + ` ${String(n).padStart(2)} worker${n === 1 ? ' ' : 's'} ${Math.round(ips).toLocaleString().padStart(8)} items/s ${bar}`, 45 + ); 46 + }
+10
examples/benchmark/work.ts
··· 1 + import { mo } from '../../src/index.ts'; 2 + 3 + export const passthrough = mo(import.meta, async function* (input: AsyncIterable<number>) { 4 + for await (const n of input) { 5 + // Small CPU cost per item so workers become the bottleneck 6 + let x = n; 7 + for (let i = 0; i < 10_000; i++) x = (x * 1103515245 + 12345) & 0x7fffffff; 8 + yield x; 9 + } 10 + });
+21
examples/channel-fanout/main.ts
··· 1 + // Fan-out a single channel to multiple workers using work stealing. 2 + // Each item goes to whichever worker is ready first. 3 + // Requires Node v24+. 4 + // 5 + // Run: node examples/channel-fanout/main.ts 6 + 7 + import { workers, channel } from '../../src/index.ts'; 8 + import { generate, process } from './work.ts'; 9 + 10 + { 11 + using run = workers(4); 12 + const data = channel(generate(200)); 13 + const results: number[][] = await run([process(data), process(data), process(data), process(data)]); 14 + 15 + for (let i = 0; i < results.length; i++) { 16 + console.log(`Worker ${i}: processed ${results[i].length} items`); 17 + } 18 + 19 + const all = results.flat().sort((a, b) => a - b); 20 + console.log(`\nTotal: ${all.length} items, none lost, none duplicated`); 21 + }
+17
examples/channel-fanout/work.ts
··· 1 + import { setTimeout } from 'node:timers/promises'; 2 + import { mo } from '../../src/index.ts'; 3 + 4 + export const generate = mo(import.meta, async function* (n: number) { 5 + for (let i = 0; i < n; i++) { 6 + yield i; 7 + } 8 + }); 9 + 10 + export const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 11 + const results: number[] = []; 12 + for await (const n of input) { 13 + await setTimeout(10); // simulate async work 14 + results.push(n); 15 + } 16 + return results; 17 + });
+3 -4
examples/pipeline/main.ts
··· 4 4 // 5 5 // Run: node examples/pipeline/main.ts 6 6 7 - import { channel } from '../../src/index.ts'; 8 7 import { generate, double, square, toString } from './steps.ts'; 9 8 10 9 const numbers = generate(5); 11 - const doubled = double(channel(numbers)); 12 - const squared = square(channel(doubled)); 13 - const labels = toString(channel(squared)); 10 + const doubled = double(numbers); 11 + const squared = square(doubled); 12 + const labels = toString(squared); 14 13 15 14 for await (const label of labels) { 16 15 console.log(label);
+2 -2
src/mo.ts
··· 16 16 type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> }; 17 17 18 18 type Moroutine<A extends unknown[], R> = { 19 - (...args: A): Task<R>; 20 - (...args: TaskableArgs<A>): Task<R>; 19 + (...args: A): Task<Awaited<R>>; 20 + (...args: TaskableArgs<A>): Task<Awaited<R>>; 21 21 }; 22 22 23 23 type StreamMoroutine<A extends unknown[], Y> = {
+5 -7
test/channel-fanout.test.ts
··· 8 8 const data = channel(generate(20)); 9 9 const run = workers(4); 10 10 try { 11 - const results: any[] = await run([ 11 + const [a, b, c, d] = await run([ 12 12 collectItems(data), 13 13 collectItems(data), 14 14 collectItems(data), 15 15 collectItems(data), 16 16 ]); 17 - const all = [...results[0], ...results[1], ...results[2], ...results[3]].sort((x: number, y: number) => x - y); 17 + const all = [...a, ...b, ...c, ...d].sort((x, y) => x - y); 18 18 assert.deepEqual(all, Array.from({ length: 20 }, (_, i) => i)); 19 19 } finally { 20 20 run[Symbol.dispose](); ··· 25 25 const data = channel(generate(100)); 26 26 const run = workers(2); 27 27 try { 28 - const results: any[] = await run([ 28 + const [a, b] = await run([ 29 29 collectItems(data), 30 30 collectItems(data), 31 31 ]); 32 - const a: number[] = results[0]; 33 - const b: number[] = results[1]; 34 32 const setA = new Set(a); 35 33 for (const item of b) { 36 34 assert.ok(!setA.has(item), `Item ${item} appeared in both consumers`); ··· 59 57 const data = channel(localGen()); 60 58 const run = workers(2); 61 59 try { 62 - const results: any[] = await run([ 60 + const [a, b] = await run([ 63 61 collectItems(data), 64 62 collectItems(data), 65 63 ]); 66 - const all = [...results[0], ...results[1]].sort((x: number, y: number) => x - y); 64 + const all = [...a, ...b].sort((x, y) => x - y); 67 65 assert.deepEqual(all, Array.from({ length: 10 }, (_, i) => i)); 68 66 } finally { 69 67 run[Symbol.dispose]();