Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: atomics-based backpressure for stream dispatch (experimental)

Replaces the pause/resume message protocol on the worker-to-parent
stream path with a shared Int32Array of two slots: INFLIGHT and STATE.
Worker increments INFLIGHT after each emit and, on hitting HIGH_WATER,
parks in Atomics.waitAsync(flags, INFLIGHT, current). Parent decrements
INFLIGHT + Atomics.notify on each pull. Cancellation is STATE=CANCEL +
notify; worker checks STATE after each wake.

Eliminates:
- 'pause'/'resume' postMessage round-trip
- Adaptive setImmediate yield per-emit (no longer needed; worker parks
directly on the atomic when backpressured, runs at microtask speed
otherwise)

Only wired through dispatchStream + handleStreamTask. pipeToPort and
channel Distributor still use the message-based adaptive-yield path via
the opts.flags-less branch in pipeIterable.

Results (M-series mac, Node v24.15.0, noopStream, 100K items):

before after
stream throughput ~410K/s ~455K/s +11%
backpressure buffer (avg) 184 ms 89 ms ~2x tighter
backpressure buffer (max) 991 ms 96 ms ~10x tighter
backpressure steady-state still stable
growing at ~89ms

Max queued items drops from ~200 to ~18 (HIGH_WATER + 1-2 overshoot),
within a factor of 2 of the theoretical minimum.

Follow-up work: apply the same pattern to pipeToPort (parent->worker
iterable args) and channel Distributor (1:N fan-out).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+92 -19
+13 -11
src/execute.ts
··· 9 9 import { AsyncIterableTask } from './stream-task.ts'; 10 10 import { runStreamOnDedicated } from './dedicated-runner.ts'; 11 11 import { CHANNEL, Channel } from './channel.ts'; 12 - import { pipeIterable } from './pipe.ts'; 12 + import { pipeIterable, FLAGS_BYTE_LEN, INFLIGHT, STATE, CANCEL } from './pipe.ts'; 13 13 import type { ChannelOptions } from './channel.ts'; 14 14 15 15 let nextCallId = 0; ··· 107 107 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 108 108 109 109 const { port1, port2 } = new MessageChannel(); 110 + // Atomics-based backpressure: shared inflight count + cancel flag. 111 + // Worker parks on INFLIGHT when it hits highWater; parent decrements 112 + // on pull and notifies. Replaces pause/resume messages and the 113 + // adaptive-yield setImmediate dance. 114 + const flagsBuf = new SharedArrayBuffer(FLAGS_BYTE_LEN); 115 + const flags = new Int32Array(flagsBuf); 110 116 111 117 const extracted = extractTransferables(args); 112 118 streamPortStack.push([]); 113 119 const preparedArgs = extracted.args.map(prepareArg); 114 120 const ports = streamPortStack.pop()!; 115 - const msg = { id, args: preparedArgs, port: port2 }; 121 + const msg = { id, args: preparedArgs, port: port2, flags: flagsBuf }; 116 122 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 117 123 118 124 let resolveDone: () => void; ··· 123 129 const queue: T[] = []; 124 130 let done = false; 125 131 let error: Error | null = null; 126 - let paused = false; 127 132 let waiting: (() => void) | null = null; 128 133 129 134 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { ··· 153 158 waiting(); 154 159 waiting = null; 155 160 } 156 - if (!paused && queue.length >= highWater) { 157 - paused = true; 158 - port1.postMessage('pause'); 159 - } 160 161 }); 161 162 port1.unref(); 162 163 ··· 168 169 while (true) { 169 170 if (queue.length > 0) { 170 171 const value = queue.shift()!; 171 - if (paused && queue.length <= LOW_WATER) { 172 - paused = false; 173 - port1.postMessage('resume'); 174 - } 172 + // Signal consumption to producer 173 + Atomics.sub(flags, INFLIGHT, 1); 174 + Atomics.notify(flags, INFLIGHT); 175 175 return { done: false, value }; 176 176 } 177 177 if (error) throw error; ··· 182 182 } 183 183 }, 184 184 async return(): Promise<IteratorResult<T>> { 185 + Atomics.store(flags, STATE, CANCEL); 186 + Atomics.notify(flags, INFLIGHT); 185 187 port1.close(); 186 188 resolveDone!(); 187 189 return { done: true, value: undefined };
+75 -5
src/pipe.ts
··· 2 2 import { serializeArg } from './shared/reconstruct.ts'; 3 3 import { collectTransferables, extractTransferables } from './transfer.ts'; 4 4 5 + export const INFLIGHT = 0; 6 + export const STATE = 1; 7 + export const RUN = 0; 8 + export const CANCEL = 1; 9 + export const FLAGS_BYTE_LEN = 2 * Int32Array.BYTES_PER_ELEMENT; 10 + 5 11 export interface PipeOptions { 6 - /** How many emits without a natural event-loop tick before forcing a yield. Defaults to 16. */ 12 + /** How many emits without a natural event-loop tick before forcing a yield. Defaults to 16. Unused when `flags` is provided. */ 7 13 yieldEvery?: number; 14 + /** Target in-flight count before the producer parks on Atomics. Defaults to 16. Used only with `flags`. */ 15 + highWater?: number; 8 16 /** 9 17 * When true, call extractTransferables on each value to pull out any 10 18 * `transfer(buf)` markers into the transfer list. Set on the parent side ··· 12 20 * come from a user generator and don't need this processing. 13 21 */ 14 22 extractTransfers?: boolean; 23 + /** 24 + * When provided, pipe uses an atomics-based backpressure protocol: 25 + * the producer parks on `INFLIGHT` when it reaches `highWater`, the 26 + * consumer increments/decrements via `Atomics.sub`+`notify` on pull, 27 + * and cancellation is signalled by writing `CANCEL` to the `STATE` slot. 28 + * Eliminates pause/resume messages and the adaptive-yield overhead. 29 + */ 30 + flags?: Int32Array; 15 31 } 16 32 17 33 /** 18 34 * Pumps values from an async iterable to a MessagePort. Owns pause/resume 19 - * handling, cancellation on port close, serialization + transferable 20 - * collection, and an adaptive yield policy that ticks the event loop only 21 - * when the producer is starving it (pure-CPU generators) and stays out of 22 - * the way when natural awaits already tick it (I/O-backed generators). 35 + * wiring, cancellation, serialization, and transferable collection. With 36 + * `flags` provided, uses atomics-based backpressure (tight, no per-item 37 + * yield); without, falls back to pause/resume messages with an adaptive 38 + * yield policy. 23 39 */ 24 40 export async function pipeIterable<T>(source: AsyncIterable<T>, port: MessagePort, opts: PipeOptions = {}): Promise<void> { 41 + if (opts.flags) return pipeWithAtomics(source, port, opts.flags, opts); 42 + return pipeWithMessages(source, port, opts); 43 + } 44 + 45 + async function pipeWithAtomics<T>( 46 + source: AsyncIterable<T>, 47 + port: MessagePort, 48 + flags: Int32Array, 49 + opts: PipeOptions, 50 + ): Promise<void> { 51 + const highWater = opts.highWater ?? 16; 52 + const extract = opts.extractTransfers ?? false; 53 + 54 + try { 55 + for await (const value of source) { 56 + if (Atomics.load(flags, STATE) === CANCEL) return; 57 + 58 + let serialized: unknown; 59 + const transferList: Transferable[] = []; 60 + if (extract) { 61 + const extracted = extractTransferables([value]); 62 + serialized = serializeArg(extracted.args[0]); 63 + transferList.push(...extracted.transfer); 64 + collectTransferables(extracted.args[0], transferList); 65 + } else { 66 + serialized = serializeArg(value); 67 + collectTransferables(value, transferList); 68 + } 69 + port.postMessage({ value: serialized, done: false }, transferList); 70 + 71 + const newInflight = Atomics.add(flags, INFLIGHT, 1) + 1; 72 + if (newInflight >= highWater) { 73 + // Park until parent drains below highWater or we're cancelled. 74 + while (true) { 75 + if (Atomics.load(flags, STATE) === CANCEL) return; 76 + const current = Atomics.load(flags, INFLIGHT); 77 + if (current < highWater) break; 78 + const { async, value: p } = Atomics.waitAsync(flags, INFLIGHT, current); 79 + if (async) await p; 80 + } 81 + } 82 + } 83 + if (Atomics.load(flags, STATE) !== CANCEL) port.postMessage({ done: true }); 84 + } catch (err) { 85 + if (Atomics.load(flags, STATE) !== CANCEL) { 86 + port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 87 + } 88 + } 89 + try { 90 + port.close(); 91 + } catch {} 92 + } 93 + 94 + async function pipeWithMessages<T>(source: AsyncIterable<T>, port: MessagePort, opts: PipeOptions): Promise<void> { 25 95 const yieldEvery = opts.yieldEvery ?? 16; 26 96 const extract = opts.extractTransfers ?? false; 27 97
+4 -3
src/worker-entry.ts
··· 130 130 return deserializeArg(arg); 131 131 } 132 132 133 - type TaskMsg = { callId?: number; id: string; args: unknown[]; port?: MessagePort }; 133 + type TaskMsg = { callId?: number; id: string; args: unknown[]; port?: MessagePort; flags?: SharedArrayBuffer }; 134 134 135 135 function postResult(callId: number, value: unknown): void { 136 136 const returnValue = serializeArg(value); ··· 210 210 } 211 211 212 212 async function handleStreamTask(msg: TaskMsg): Promise<void> { 213 - const { id, args, port } = msg; 213 + const { id, args, port, flags } = msg; 214 214 const callId = msg.callId; 215 215 try { 216 216 const fnM = resolveFn(id); ··· 221 221 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 222 222 } 223 223 const gen = fn(...resolvedArgs) as AsyncGenerator; 224 - await pipeIterable(gen, port!); 224 + const flagsView = flags ? new Int32Array(flags) : undefined; 225 + await pipeIterable(gen, port!, { flags: flagsView }); 225 226 } catch (err) { 226 227 if (callId != null) postError(callId, err); 227 228 }