Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(channel): round-robin consumer selection in fan-out

Previously findReadyConsumer scanned from index 0 and picked the first
below-cap consumer, so consumer 0 won nearly every tie and the first
worker received a disproportionate share of items. Now the scan starts
from a rotating cursor, giving every below-cap consumer a turn.

Skip-based RR semantics are preserved — consumers at cap are still
skipped so a saturated worker doesn't stall the pipeline. Adds a
dedicated benchmark (examples/benchmark-dispatch/fanout.ts) that
measures distribution spread at different volumes and under
backpressure; tightens the fan-out tests to use the
run.workers.map(assign) pattern so results don't depend on the pool's
default balancer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+174 -48
+7
.changeset/channel-fairness.md
··· 1 + --- 2 + 'moroutine': patch 3 + --- 4 + 5 + `channel()` fan-out now rotates consumer selection round-robin instead of always preferring the lowest-index consumer. Previously, when several consumers were below `highWaterMark`, the distributor would pick `consumers[0]` every time, skewing work toward the first worker. The scan now starts from a rotating cursor, so ties distribute evenly. 6 + 7 + Skip-based RR semantics are unchanged — consumers at cap are still skipped so a saturated worker doesn't stall the pipeline. Fairness is best at higher volumes (≥10K items) and under any real backpressure, where initial warm-up asymmetry washes out. See `examples/benchmark-dispatch/fanout.ts` for measured distributions.
+83
examples/benchmark-dispatch/fanout.ts
··· 1 + // Measures channel() fan-out distribution and throughput under various 2 + // load profiles: 3 + // - No backpressure: consumers are trivial, producer races ahead 4 + // - Backpressure: consumers burn CPU per item, producer stalls on caps 5 + // 6 + // Skip-based RR skips consumers at cap, so distribution skew is highest 7 + // when the producer outpaces consumers AND initial warm-up timing varies. 8 + // As N grows the transient fades and counts even out. 9 + // 10 + // Requires Node v24+. 11 + // Run: node examples/benchmark-dispatch/fanout.ts 12 + 13 + import { availableParallelism } from 'node:os'; 14 + import { workers, channel, assign } from '../../src/index.ts'; 15 + import { noopConsume, noopConsumeBusy } from './noop.ts'; 16 + 17 + const maxWorkers = availableParallelism(); 18 + 19 + interface FanoutResult { 20 + counts: number[]; 21 + throughput: number; 22 + elapsedMs: number; 23 + } 24 + 25 + async function fanout(n: number, numWorkers: number): Promise<FanoutResult> { 26 + using run = workers(numWorkers); 27 + async function* source() { 28 + for (let i = 0; i < n; i++) yield i; 29 + } 30 + const ch = channel(source()); 31 + const tasks = run.workers.map((w) => assign(w, noopConsume(ch))); 32 + const t0 = performance.now(); 33 + const counts = (await run(tasks)) as number[]; 34 + const elapsedMs = performance.now() - t0; 35 + const total = counts.reduce((a, b) => a + b, 0); 36 + if (total !== n) throw new Error(`expected ${n} items, got ${total}`); 37 + return { counts, throughput: n / (elapsedMs / 1000), elapsedMs }; 38 + } 39 + 40 + async function fanoutBusy(n: number, numWorkers: number, busyUs: number): Promise<FanoutResult> { 41 + using run = workers(numWorkers); 42 + async function* source() { 43 + for (let i = 0; i < n; i++) yield i; 44 + } 45 + const ch = channel(source()); 46 + const tasks = run.workers.map((w) => assign(w, noopConsumeBusy(ch, busyUs))); 47 + const t0 = performance.now(); 48 + const counts = (await run(tasks)) as number[]; 49 + const elapsedMs = performance.now() - t0; 50 + const total = counts.reduce((a, b) => a + b, 0); 51 + if (total !== n) throw new Error(`expected ${n} items, got ${total}`); 52 + return { counts, throughput: n / (elapsedMs / 1000), elapsedMs }; 53 + } 54 + 55 + function fmtCounts(counts: number[]): string { 56 + const mean = counts.reduce((a, b) => a + b, 0) / counts.length; 57 + const min = Math.min(...counts); 58 + const max = Math.max(...counts); 59 + const spread = ((max - min) / mean) * 100; 60 + return `[${counts.map((c) => c.toString().padStart(6)).join(', ')}] spread ${spread.toFixed(1)}% (min ${min}, max ${max}, mean ${Math.round(mean)})`; 61 + } 62 + 63 + function fmtThroughput(r: FanoutResult): string { 64 + return `${Math.round(r.throughput).toLocaleString().padStart(10)} items/s (${r.elapsedMs.toFixed(1)}ms)`; 65 + } 66 + 67 + const W = 4; 68 + console.log(`workers: ${W} (system has ${maxWorkers} cpus)\n`); 69 + 70 + console.log('no-backpressure fan-out (trivial consumer, producer races ahead):'); 71 + for (const n of [400, 4_000, 40_000, 400_000]) { 72 + const r = await fanout(n, W); 73 + console.log(` n=${n.toString().padStart(7)} ${fmtThroughput(r)} ${fmtCounts(r.counts)}`); 74 + } 75 + 76 + console.log('\nbackpressure fan-out (consumer burns CPU per item):'); 77 + for (const busyUs of [10, 50, 200]) { 78 + const n = busyUs >= 200 ? 4_000 : 20_000; 79 + const r = await fanoutBusy(n, W, busyUs); 80 + console.log( 81 + ` busy=${String(busyUs).padStart(3)}µs n=${n.toString().padStart(6)} ${fmtThroughput(r)} ${fmtCounts(r.counts)}`, 82 + ); 83 + }
+19
examples/benchmark-dispatch/noop.ts
··· 24 24 for await (const _ of items) count++; 25 25 return count; 26 26 }); 27 + 28 + // Consumer that burns CPU for a target number of microseconds per item — 29 + // simulates real work and creates backpressure when the producer can outpace 30 + // it. 31 + export const noopConsumeBusy = mo( 32 + import.meta, 33 + async (items: AsyncIterable<number>, busyUs: number): Promise<number> => { 34 + let count = 0; 35 + const spinNs = BigInt(Math.round(busyUs * 1000)); 36 + for await (const _ of items) { 37 + count++; 38 + const start = process.hrtime.bigint(); 39 + while (process.hrtime.bigint() - start < spinNs) { 40 + /* spin */ 41 + } 42 + } 43 + return count; 44 + }, 45 + );
+2 -3
examples/benchmark/main.ts
··· 6 6 // Run: node examples/benchmark/main.ts 7 7 8 8 import { availableParallelism } from 'node:os'; 9 - import { workers, channel } from '../../src/index.ts'; 9 + import { workers, channel, assign } from '../../src/index.ts'; 10 10 import { passthrough } from './work.ts'; 11 11 12 12 const ITEMS = 100_000; ··· 20 20 const data = channel(source()); 21 21 22 22 using run = workers(numWorkers); 23 - const tasks = Array.from({ length: numWorkers }, () => passthrough(data)); 24 - const streams = tasks.map((t) => run(t)); 23 + const streams = run.workers.map((w) => run(assign(w, passthrough(data)))); 25 24 26 25 const start = performance.now(); 27 26 let count = 0;
+18 -6
src/channel.ts
··· 32 32 * One-producer, many-consumer source with atomics-based backpressure. 33 33 * 34 34 * A single `tryPull` loop iterates the shared source and dispatches each 35 - * item to the first consumer whose `inflight` atomic is below highWater. 36 - * When every consumer is at cap, the loop parks on a shared `readySignal` 37 - * atomic until any consumer pulls from its port (which decrements its 38 - * inflight and bumps readySignal, waking the loop). 35 + * item to the first below-cap consumer starting from `cursor` (skip-based 36 + * round-robin). When every consumer is at cap, the loop parks on a shared 37 + * `readySignal` atomic until any consumer pulls from its port (which 38 + * decrements its inflight and bumps readySignal, waking the loop). 39 39 */ 40 40 export class Channel<T> { 41 41 private readonly iter: AsyncIterator<T>; 42 42 private readonly consumers: Consumer[] = []; 43 43 private readonly readySignal: Int32Atomic = new Int32Atomic(); 44 44 private readonly highWater: number; 45 + private cursor = 0; 45 46 private pulling = false; 46 47 private done = false; 47 48 private error: Error | null = null; ··· 102 103 return handle; 103 104 } 104 105 106 + // Skip-based round-robin: scan from `cursor`, pick the first non-cancelled 107 + // consumer below cap. Cursor advances past the picked consumer so ties 108 + // rotate across the below-cap set. Consumers at cap are skipped — this 109 + // lets faster consumers keep pulling when a peer is saturated, trading 110 + // strict fairness for resilience and throughput. 105 111 private findReadyConsumer(): Consumer | null { 106 112 const hw = this.highWater; 107 - for (let i = 0; i < this.consumers.length; i++) { 113 + const n = this.consumers.length; 114 + if (n === 0) return null; 115 + for (let k = 0; k < n; k++) { 116 + const i = (this.cursor + k) % n; 108 117 const c = this.consumers[i]; 109 118 if (c.flags.fields.state.load() === CANCEL) continue; 110 - if (c.flags.fields.inflight.load() < hw) return c; 119 + if (c.flags.fields.inflight.load() < hw) { 120 + this.cursor = (i + 1) % n; 121 + return c; 122 + } 111 123 } 112 124 return null; 113 125 }
+45 -39
test/channel-fanout.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workers, channel } from 'moroutine'; 3 + import { workers, channel, assign } from 'moroutine'; 4 4 import { collectItems, generate } from './fixtures/channel-fanout.ts'; 5 5 6 6 describe('channel fan-out', () => { 7 7 it('distributes items across multiple consumers', async () => { 8 8 const data = channel(generate(20)); 9 - const run = workers(4); 10 - try { 11 - const [a, b, c, d] = await run([collectItems(data), collectItems(data), collectItems(data), collectItems(data)]); 12 - const all = [...a, ...b, ...c, ...d].sort((x, y) => x - y); 13 - assert.deepEqual( 14 - all, 15 - Array.from({ length: 20 }, (_, i) => i), 16 - ); 17 - } finally { 18 - run[Symbol.dispose](); 19 - } 9 + using run = workers(4); 10 + const fanout = run.workers.map((w) => assign(w, collectItems(data))); 11 + const [a, b, c, d] = await run(fanout); 12 + const all = [...a, ...b, ...c, ...d].sort((x, y) => x - y); 13 + assert.deepEqual( 14 + all, 15 + Array.from({ length: 20 }, (_, i) => i), 16 + ); 20 17 }); 21 18 22 19 it('each item goes to exactly one consumer', async () => { 23 20 const data = channel(generate(100)); 24 - const run = workers(2); 25 - try { 26 - const [a, b] = await run([collectItems(data), collectItems(data)]); 27 - const setA = new Set(a); 28 - for (const item of b) { 29 - assert.ok(!setA.has(item), `Item ${item} appeared in both consumers`); 30 - } 31 - assert.equal(a.length + b.length, 100); 32 - } finally { 33 - run[Symbol.dispose](); 21 + using run = workers(2); 22 + const fanout = run.workers.map((w) => assign(w, collectItems(data))); 23 + const [a, b] = await run(fanout); 24 + const setA = new Set(a); 25 + for (const item of b) { 26 + assert.ok(!setA.has(item), `Item ${item} appeared in both consumers`); 34 27 } 28 + assert.equal(a.length + b.length, 100); 35 29 }); 36 30 37 31 it('single consumer via channel() still works', async () => { ··· 40 34 yield 2; 41 35 yield 3; 42 36 } 43 - const run = workers(1); 44 - try { 45 - const result = await run(collectItems(channel(numbers()))); 46 - assert.deepEqual(result, [1, 2, 3]); 47 - } finally { 48 - run[Symbol.dispose](); 49 - } 37 + using run = workers(1); 38 + const result = await run(collectItems(channel(numbers()))); 39 + assert.deepEqual(result, [1, 2, 3]); 50 40 }); 51 41 52 42 it('channel with local AsyncIterable fan-out', async () => { ··· 54 44 for (let i = 0; i < 10; i++) yield i; 55 45 } 56 46 const data = channel(localGen()); 57 - const run = workers(2); 58 - try { 59 - const [a, b] = await run([collectItems(data), collectItems(data)]); 60 - const all = [...a, ...b].sort((x, y) => x - y); 61 - assert.deepEqual( 62 - all, 63 - Array.from({ length: 10 }, (_, i) => i), 64 - ); 65 - } finally { 66 - run[Symbol.dispose](); 47 + using run = workers(2); 48 + const fanout = run.workers.map((w) => assign(w, collectItems(data))); 49 + const [a, b] = await run(fanout); 50 + const all = [...a, ...b].sort((x, y) => x - y); 51 + assert.deepEqual( 52 + all, 53 + Array.from({ length: 10 }, (_, i) => i), 54 + ); 55 + }); 56 + 57 + it('does not starve any consumer (skip-based round-robin)', async () => { 58 + async function* localGen() { 59 + for (let i = 0; i < 4000; i++) yield i; 67 60 } 61 + const data = channel(localGen()); 62 + using run = workers(4); 63 + const fanout = run.workers.map((w) => assign(w, collectItems(data))); 64 + const counts = await run(fanout); 65 + const lengths = counts.map((c) => c.length); 66 + assert.equal( 67 + lengths.reduce((a, b) => a + b, 0), 68 + 4000, 69 + ); 70 + // Skip-based RR doesn't guarantee even splits under heterogeneous drain 71 + // rates, but no consumer should be starved to just its initial fill. 72 + const min = Math.min(...lengths); 73 + assert.ok(min >= 100, `expected each consumer ≥ 100 items, got ${lengths.join(',')}`); 68 74 }); 69 75 });