Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

bench: fair 1-direction stream+channel, expand channel fan-out

Add noopStreamGen and noopConsume moroutines + matching benches that
pit 1-direction stream (worker→parent) against 1-direction channel
(parent→worker) on equal footing — both are one cross-thread hop per
item.

Findings (on this branch, HEAD):

stream (worker generates → parent consumes) ~746K items/s
channel (parent generates → worker consumes) ~652K items/s
stream round-trip (parent → worker → parent) ~464K items/s

vs unoptimized (bb41d5d):
stream 1-dir: ~66K (+11.3x)
channel 1-dir: ~67K (+9.7x)

Two takeaways:
- The earlier "channel beats stream" observation was an artifact of
comparing channel's 1-hop case to stream's 2-hop (pass-through)
noopStream-with-input case. When measured 1-direction-vs-1-direction,
stream and channel are tied pre-optimization.
- Post-optimization, stream is ~15% faster than channel — that's the
win from atomics-based backpressure over message+adaptive-yield.
Applying atomics to the Distributor would likely close that gap.

Also keeps channelFanout at varying worker counts as a separate
section — shows the Distributor's single-threaded producer bottleneck
(peak at ~2 consumers, regresses past 4).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+52 -6
+41 -6
examples/benchmark-dispatch/main.ts
··· 11 11 // Run: node examples/benchmark-dispatch/main.ts [--atomics] 12 12 13 13 import { availableParallelism } from 'node:os'; 14 - import { workers } from '../../src/index.ts'; 15 - import { noop, noopBatch, noopStream } from './noop.ts'; 14 + import { workers, channel, assign } from '../../src/index.ts'; 15 + import { noop, noopBatch, noopStream, noopStreamGen, noopConsume } from './noop.ts'; 16 16 17 17 const atomicsFlag = process.argv.includes('--atomics'); 18 18 const maxWorkers = availableParallelism(); ··· 67 67 return iters / ((performance.now() - t0) / 1000); 68 68 } 69 69 70 - // Streaming: items flow over a MessageChannel to the worker-side async 71 - // generator. Worker drains at its own pace — no per-item task envelope. 70 + // 1-direction stream: worker generates items, parent consumes. Fair 71 + // comparison to channel (one cross-thread hop per item in both cases). 72 + async function streamingOneWay(iters: number): Promise<number> { 73 + using run = workers(1, poolOpts); 74 + for await (const _ of run(noopStreamGen(3))) void _; // warm 75 + const t0 = performance.now(); 76 + let count = 0; 77 + for await (const _ of run(noopStreamGen(iters))) { 78 + count++; 79 + void _; 80 + } 81 + if (count !== iters) throw new Error(`expected ${iters} items, got ${count}`); 82 + return iters / ((performance.now() - t0) / 1000); 83 + } 84 + 85 + async function channelFanout(iters: number, numWorkers: number): Promise<number> { 86 + using run = workers(numWorkers, poolOpts); 87 + async function* source() { 88 + for (let i = 0; i < iters; i++) yield i; 89 + } 90 + const ch = channel(source()); 91 + const tasks = run.workers.map((w) => assign(w, noopConsume(ch))); 92 + const t0 = performance.now(); 93 + const counts = (await run(tasks)) as number[]; 94 + const total = counts.reduce((a, b) => a + b, 0); 95 + if (total !== iters) throw new Error(`expected ${iters} items, got ${total}`); 96 + return iters / ((performance.now() - t0) / 1000); 97 + } 98 + 99 + // Round-trip stream: parent generates → worker re-yields → parent consumes. 100 + // Two hops per item, so ~half the throughput of a one-way test. 72 101 async function streaming(iters: number): Promise<number> { 73 102 using run = workers(1, poolOpts); 74 103 async function* probe() { ··· 112 141 console.log(` ${String(n).padStart(2)} workers ${Math.round(ips).toLocaleString().padStart(9)} ops/s`); 113 142 } 114 143 144 + console.log('\nfair 1-direction comparison (1 worker, 100K items):'); 145 + const str1 = await streamingOneWay(PAR); 146 + const ch1 = await channelFanout(PAR, 1); 147 + const strRT = await streaming(PAR); 148 + console.log(` stream (worker generates → parent consumes) ${Math.round(str1).toLocaleString().padStart(9)} items/s`); 149 + console.log(` channel (parent generates → worker consumes) ${Math.round(ch1).toLocaleString().padStart(9)} items/s`); 150 + console.log(` round-trip stream (parent → worker → parent, 2 hops) ${Math.round(strRT).toLocaleString().padStart(9)} items/s`); 151 + 115 152 console.log('\nbatch vs stream vs per-task (1 worker, 100K items):'); 116 153 const perTask = await parallel(PAR, 1); 117 154 const bat = await batched(PAR); 118 - const str = await streaming(PAR); 119 155 console.log(` per-task (run(noop(i)) ×N) ${Math.round(perTask).toLocaleString().padStart(9)} items/s`); 120 156 console.log(` batch (run(noopBatch([...N]))) ${Math.round(bat).toLocaleString().padStart(9)} items/s`); 121 - console.log(` stream (run(noopStream(gen()))) ${Math.round(str).toLocaleString().padStart(9)} items/s`);
+11
examples/benchmark-dispatch/noop.ts
··· 13 13 export const noopStream = mo(import.meta, async function* (items: AsyncIterable<number>) { 14 14 for await (const item of items) yield item; 15 15 }); 16 + 17 + // Worker-side generator — produces N items. One-direction: worker → parent. 18 + export const noopStreamGen = mo(import.meta, async function* (n: number) { 19 + for (let i = 0; i < n; i++) yield i; 20 + }); 21 + 22 + export const noopConsume = mo(import.meta, async (items: AsyncIterable<number>): Promise<number> => { 23 + let count = 0; 24 + for await (const _ of items) count++; 25 + return count; 26 + });