Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

bench: add dispatch perf tool (per-task, pipelined, parallel, batch, stream)

Measures pure round-trip dispatch overhead with a noop moroutine —
the task does essentially no work, so timings reflect the cost of
postMessage + event loop re-entry per task.

Shapes measured:
- Ping-pong latency (strict await-each, 1 worker)
- Pipelined throughput (1 worker, variable in-flight window)
- Parallel throughput (N workers, Promise.all)
- Batch arg vs per-task vs streaming (1 worker, 100K items)

The batch-vs-stream comparison is particularly informative: batching
100K items into a single task arg is ~34x faster than per-task
dispatch, while the current streaming implementation is ~8x slower
than per-task because of the per-item setImmediate yield used to
keep pause/resume backpressure responsive.

Used together with docs/atomics-bench/ (local-only research notes)
to evaluate Piscina's atomics technique — conclusion on the
atomics-dispatch branch; this bench stays useful regardless.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+136
+121
examples/benchmark-dispatch/main.ts
··· 1 + // Measures pure round-trip dispatch overhead for value tasks — a noop 2 + // function is dispatched N times and timed. Surfaces the cost of 3 + // postMessage + event loop re-entry per task. 4 + // 5 + // Three benchmarks: 6 + // - Ping-pong latency: strict await-each-dispatch, 1 worker 7 + // - Pipelined throughput: N in-flight at once, 1 worker 8 + // - Parallel throughput: N in-flight spread across pool 9 + // 10 + // Requires Node v24+. 11 + // Run: node examples/benchmark-dispatch/main.ts [--atomics] 12 + 13 + import { availableParallelism } from 'node:os'; 14 + import { workers } from '../../src/index.ts'; 15 + import { noop, noopBatch, noopStream } from './noop.ts'; 16 + 17 + const atomicsFlag = process.argv.includes('--atomics'); 18 + const maxWorkers = availableParallelism(); 19 + 20 + // Passing `atomics: true` is forward-compatible — the option is ignored 21 + // until the feature lands, so the same bench file times both modes. 22 + const poolOpts = atomicsFlag ? ({ atomics: true } as any) : {}; 23 + 24 + async function warmup(run: ReturnType<typeof workers>, n: number) { 25 + for (let i = 0; i < n; i++) await run(noop(i)); 26 + } 27 + 28 + async function pingPong(iters: number): Promise<number> { 29 + using run = workers(1, poolOpts); 30 + await warmup(run, 100); 31 + const t0 = performance.now(); 32 + for (let i = 0; i < iters; i++) await run(noop(i)); 33 + return iters / ((performance.now() - t0) / 1000); 34 + } 35 + 36 + async function pipelined(iters: number, inflight: number): Promise<number> { 37 + using run = workers(1, poolOpts); 38 + await warmup(run, 100); 39 + const t0 = performance.now(); 40 + let next = 0; 41 + async function slot() { 42 + while (next < iters) { 43 + const i = next++; 44 + await run(noop(i)); 45 + } 46 + } 47 + await Promise.all(Array.from({ length: inflight }, () => slot())); 48 + return iters / ((performance.now() - t0) / 1000); 49 + } 50 + 51 + async function parallel(iters: number, numWorkers: number): Promise<number> { 52 + using run = workers(numWorkers, poolOpts); 53 + await warmup(run, 100); 54 + const t0 = performance.now(); 55 + const promises = Array.from({ length: iters }, (_, i) => run(noop(i))); 56 + await Promise.all(promises); 57 + return iters / ((performance.now() - t0) / 1000); 58 + } 59 + 60 + // One task carries the whole batch — items are not dispatched individually. 61 + async function batched(iters: number): Promise<number> { 62 + using run = workers(1, poolOpts); 63 + await run(noopBatch([1, 2, 3])); // warm the handler 64 + const items = Array.from({ length: iters }, (_, i) => i); 65 + const t0 = performance.now(); 66 + await run(noopBatch(items)); 67 + return iters / ((performance.now() - t0) / 1000); 68 + } 69 + 70 + // Streaming: items flow over a MessageChannel to the worker-side async 71 + // generator. Worker drains at its own pace — no per-item task envelope. 72 + async function streaming(iters: number): Promise<number> { 73 + using run = workers(1, poolOpts); 74 + async function* probe() { 75 + for (let i = 0; i < 3; i++) yield i; 76 + } 77 + for await (const _ of run(noopStream(probe()))) void _; // warm 78 + 79 + async function* gen() { 80 + for (let i = 0; i < iters; i++) yield i; 81 + } 82 + const t0 = performance.now(); 83 + let count = 0; 84 + for await (const _ of run(noopStream(gen()))) { 85 + count++; 86 + void _; 87 + } 88 + if (count !== iters) throw new Error(`expected ${iters} items, got ${count}`); 89 + return iters / ((performance.now() - t0) / 1000); 90 + } 91 + 92 + const SEQ = 20_000; 93 + const PIPE = 50_000; 94 + const PAR = 100_000; 95 + 96 + console.log(`mode: ${atomicsFlag ? 'atomics' : 'default (postMessage events)'}`); 97 + console.log(`cpus: ${maxWorkers}\n`); 98 + 99 + console.log('ping-pong latency (strict await-each, 1 worker):'); 100 + const pp = await pingPong(SEQ); 101 + console.log(` ${SEQ.toLocaleString()} tasks ${Math.round(pp).toLocaleString().padStart(9)} ops/s (${(1e6 / pp).toFixed(1)} µs/roundtrip)`); 102 + 103 + console.log('\npipelined throughput (1 worker, in-flight window):'); 104 + for (const window of [4, 16, 64, 256]) { 105 + const ips = await pipelined(PIPE, window); 106 + console.log(` window=${String(window).padStart(3)} ${Math.round(ips).toLocaleString().padStart(9)} ops/s`); 107 + } 108 + 109 + console.log('\nparallel throughput (N workers, 100K tasks):'); 110 + for (const n of [1, 2, 4, maxWorkers]) { 111 + const ips = await parallel(PAR, n); 112 + console.log(` ${String(n).padStart(2)} workers ${Math.round(ips).toLocaleString().padStart(9)} ops/s`); 113 + } 114 + 115 + console.log('\nbatch vs stream vs per-task (1 worker, 100K items):'); 116 + const perTask = await parallel(PAR, 1); 117 + const bat = await batched(PAR); 118 + const str = await streaming(PAR); 119 + console.log(` per-task (run(noop(i)) ×N) ${Math.round(perTask).toLocaleString().padStart(9)} items/s`); 120 + console.log(` batch (run(noopBatch([...N]))) ${Math.round(bat).toLocaleString().padStart(9)} items/s`); 121 + console.log(` stream (run(noopStream(gen()))) ${Math.round(str).toLocaleString().padStart(9)} items/s`);
+15
examples/benchmark-dispatch/noop.ts
··· 1 + import { mo } from '../../src/index.ts'; 2 + 3 + // Tasks that do essentially zero work. Any non-trivial measurement is 4 + // dominated by dispatch overhead (postMessage + event loop re-entry). 5 + 6 + // One task per item — full per-task round trip. 7 + export const noop = mo(import.meta, (x: number): number => x); 8 + 9 + // One task, N items bundled as an array arg. Worker loops locally, returns once. 10 + export const noopBatch = mo(import.meta, (items: number[]): number[] => items); 11 + 12 + // One streaming task; items arrive over a MessageChannel, no per-item task envelope. 13 + export const noopStream = mo(import.meta, async function* (items: AsyncIterable<number>) { 14 + for await (const item of items) yield item; 15 + });