Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf(channel): atomics backpressure via single distributor + shared readySignal

Hybrid design: keep the single tryPull loop on the parent (no
contention on source.next()) but replace message-based pause/resume
with atomics.

- Each consumer has its own PipeFlags (inflight + state); consumer-
side portToAsyncIterable decrements inflight on pull.
- A single Int32Atomic readySignal is shared across all consumers of
the channel. Consumers bump readySignal on the cap→below-cap
transition (only when it matters, to avoid spurious distributor
wakeups). The distributor parks on readySignal when every consumer
is at cap.
- Stream-arg shape extended: {__stream__, port, flags, readySignal?}.
Channel sets readySignal; pipeToPort / worker-generator streams
don't (they use inflight directly).

Tradeoff vs the previous message-based adaptive-yield Distributor:

before after delta
1 consumer ~614K/s ~490K -20%
2 consumers ~646K ~680K +5%
4 consumers ~590K ~640K +8%
8 consumers ~412K ~546K +33%

Scales much better past 2 consumers — message-based pause/resume
added per-tick overhead that grew with consumer count. The 1-consumer
regression is the fixed per-item atomic cost (inflight.sub on every
pull, transitions checked). Channel's value is fan-out; a 1-consumer
use case should typically be a regular stream.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+145 -102
+99 -92
src/channel.ts
··· 1 - import { MessageChannel, MessagePort } from 'node:worker_threads'; 2 - import type { Transferable } from 'node:worker_threads'; 3 - import { serializeArg } from './shared/reconstruct.ts'; 4 - import { collectTransferables } from './transfer.ts'; 1 + import { MessageChannel } from 'node:worker_threads'; 2 + import type { MessagePort, Transferable } from 'node:worker_threads'; 5 3 import { AsyncIterableTask } from './stream-task.ts'; 6 4 import { runStreamOnDedicated } from './dedicated-runner.ts'; 5 + import { newPipeFlags, CANCEL } from './pipe.ts'; 6 + import type { PipeFlags } from './pipe.ts'; 7 + import { Int32Atomic } from './shared/int32-atomic.ts'; 8 + import { serializeArg } from './shared/reconstruct.ts'; 9 + import { collectTransferables } from './transfer.ts'; 7 10 8 11 const CHANNEL = Symbol.for('moroutine.channel'); 12 + const HIGH_WATER = 16; 9 13 10 14 /** Options for configuring a channel. */ 11 15 export interface ChannelOptions { ··· 13 17 highWaterMark?: number; 14 18 } 15 19 16 - const DEFAULT_HIGH_WATER = 16; 17 - 18 20 interface Consumer { 19 21 port: MessagePort; 20 - ready: boolean; 22 + flags: PipeFlags; 21 23 } 22 24 23 - class Distributor<T> { 24 - private consumers: Consumer[] = []; 25 - private sourceIterator: AsyncIterator<T>; 25 + /** 26 + * One-producer, many-consumer source with atomics-based backpressure. 27 + * 28 + * A single `tryPull` loop iterates the shared source and dispatches each 29 + * item to the first consumer whose `inflight` atomic is below HIGH_WATER. 30 + * When every consumer is at cap, the loop parks on a shared `readySignal` 31 + * atomic until any consumer pulls from its port (which decrements its 32 + * inflight and bumps readySignal, waking the loop). 33 + */ 34 + export class Channel<T> { 35 + private readonly iter: AsyncIterator<T>; 36 + private readonly consumers: Consumer[] = []; 37 + private readonly readySignalBuf: SharedArrayBuffer = new SharedArrayBuffer(4); 38 + private readonly readySignal: Int32Atomic = new Int32Atomic(this.readySignalBuf, 0); 39 + private pulling = false; 26 40 private done = false; 27 41 private error: Error | null = null; 28 - private pulling = false; 42 + readonly [CHANNEL] = true; 29 43 30 44 constructor(source: AsyncIterable<T>) { 31 - this.sourceIterator = source[Symbol.asyncIterator](); 45 + const src = 46 + source instanceof AsyncIterableTask 47 + ? (runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>) 48 + : source; 49 + this.iter = src[Symbol.asyncIterator](); 32 50 } 33 51 34 - addConsumer(): MessagePort { 52 + /** Per-consumer handle. Returns the worker-facing port, the consumer's 53 + * private flags (inflight + state) and the channel-wide readySignal. */ 54 + addConsumer(): { port: MessagePort; flags: SharedArrayBuffer; readySignal: SharedArrayBuffer } { 35 55 const { port1, port2 } = new MessageChannel(); 36 56 port1.unref(); 37 - 38 - const consumer: Consumer = { port: port1, ready: true }; 39 - 40 - port1.on('message', (signal: string) => { 41 - if (signal === 'pause') { 42 - consumer.ready = false; 43 - } else if (signal === 'resume') { 44 - consumer.ready = true; 45 - this.tryPull(); 46 - } 47 - }); 48 - 49 - port1.on('close', () => { 50 - this.consumers = this.consumers.filter((c) => c !== consumer); 51 - }); 52 - 53 - this.consumers.push(consumer); 57 + const flags = newPipeFlags(); 54 58 55 59 if (this.done) { 56 60 if (this.error) { 57 - port1.postMessage({ done: true, error: this.error.message }); 61 + port1.postMessage({ done: true, error: this.error }); 58 62 } else { 59 63 port1.postMessage({ done: true }); 60 64 } 61 65 try { 62 66 port1.close(); 63 67 } catch {} 64 - } else { 65 - this.tryPull(); 68 + return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf }; 66 69 } 67 70 68 - return port2; 71 + const consumer: Consumer = { port: port1, flags }; 72 + this.consumers.push(consumer); 73 + 74 + port1.on('close', () => { 75 + consumer.flags.state.store(CANCEL); 76 + const idx = this.consumers.indexOf(consumer); 77 + if (idx >= 0) this.consumers.splice(idx, 1); 78 + this.readySignal.add(1); 79 + this.readySignal.notify(); 80 + }); 81 + 82 + // Ensure the pull loop is running; also bump readySignal in case the 83 + // existing loop is currently parked (the new consumer is ready). 84 + this.readySignal.add(1); 85 + this.readySignal.notify(); 86 + void this.tryPull(); 87 + 88 + return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf }; 89 + } 90 + 91 + private findReadyConsumer(): Consumer | null { 92 + for (let i = 0; i < this.consumers.length; i++) { 93 + const c = this.consumers[i]; 94 + if (c.flags.state.load() === CANCEL) continue; 95 + if (c.flags.inflight.load() < HIGH_WATER) return c; 96 + } 97 + return null; 69 98 } 70 99 71 100 private async tryPull(): Promise<void> { 72 101 if (this.pulling || this.done) return; 73 102 this.pulling = true; 74 103 75 - // Adaptive yield (same pattern as pipeIterable). We can't use that helper 76 - // here because each item goes to a different port (work stealing) and 77 - // pause-state is per-consumer, not per-loop. 78 - const YIELD_EVERY = 16; 79 - let ticked = false; 80 - setImmediate(() => { 81 - ticked = true; 82 - }); 83 - let streak = 0; 84 - 85 104 try { 86 105 while (!this.done) { 87 - const readyConsumer = this.consumers.find((c) => c.ready); 88 - if (!readyConsumer) break; // no ready consumers, pause pulling 106 + let consumer = this.findReadyConsumer(); 107 + if (!consumer) { 108 + // All at cap (or no consumers) — park on readySignal until any 109 + // consumer pulls (decrements inflight + bumps readySignal) or 110 + // a new consumer joins or an existing one closes. 111 + const signal = this.readySignal.load(); 112 + // Double-check to close the register-then-check race window. 113 + consumer = this.findReadyConsumer(); 114 + if (!consumer) { 115 + if (this.consumers.length === 0) { 116 + // No consumers at all — parking forever would leak the loop. 117 + // Bail; next addConsumer restarts us. 118 + return; 119 + } 120 + await this.readySignal.waitAsync(signal); 121 + continue; 122 + } 123 + } 89 124 90 - const result = await this.sourceIterator.next(); 125 + const result = await this.iter.next(); 91 126 if (result.done) { 92 127 this.done = true; 93 128 for (const c of this.consumers) { 94 - c.port.postMessage({ done: true }); 129 + try { 130 + c.port.postMessage({ done: true }); 131 + } catch {} 95 132 try { 96 133 c.port.close(); 97 134 } catch {} 98 135 } 99 - break; 136 + return; 100 137 } 101 138 102 139 const serialized = serializeArg(result.value); 103 140 const transferList: Transferable[] = []; 104 141 collectTransferables(result.value, transferList); 105 - readyConsumer.port.postMessage({ value: serialized, done: false }, transferList as any[]); 106 - 107 - streak++; 108 - if (ticked) { 109 - ticked = false; 110 - setImmediate(() => { 111 - ticked = true; 112 - }); 113 - streak = 0; 114 - } else if (streak >= YIELD_EVERY) { 115 - await new Promise((r) => setImmediate(r)); 116 - ticked = false; 117 - setImmediate(() => { 118 - ticked = true; 119 - }); 120 - streak = 0; 142 + try { 143 + consumer.port.postMessage({ value: serialized, done: false }, transferList); 144 + consumer.flags.inflight.add(1); 145 + } catch { 146 + // Consumer may have closed between scan and post; move on. 121 147 } 122 148 } 123 149 } catch (err) { 124 - this.done = true; 125 150 this.error = err instanceof Error ? err : new Error(String(err)); 151 + this.done = true; 126 152 for (const c of this.consumers) { 127 - c.port.postMessage({ done: true, error: this.error.message }); 153 + try { 154 + c.port.postMessage({ done: true, error: this.error }); 155 + } catch {} 128 156 try { 129 157 c.port.close(); 130 158 } catch {} 131 159 } 160 + } finally { 161 + this.pulling = false; 132 162 } 133 - 134 - this.pulling = false; 135 - } 136 - } 137 - 138 - export class Channel<T> { 139 - private distributor: Distributor<T> | null = null; 140 - private source: AsyncIterable<T>; 141 - readonly [CHANNEL] = true; 142 - 143 - constructor(source: AsyncIterable<T>) { 144 - this.source = source; 145 - } 146 - 147 - addConsumer(): MessagePort { 148 - if (!this.distributor) { 149 - let source: AsyncIterable<T> = this.source; 150 - if (source instanceof AsyncIterableTask) { 151 - source = runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>; 152 - } 153 - this.distributor = new Distributor<T>(source); 154 - } 155 - return this.distributor.addConsumer(); 156 163 } 157 164 } 158 165 ··· 160 167 * Creates a channel for streaming values to workers. Supports fan-out when 161 168 * the same channel is passed to multiple tasks. 162 169 * @param iterable - The async iterable or StreamTask to distribute. 163 - * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). 170 + * @param _opts - Optional. Reserved for future configuration. 164 171 * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use. 165 172 */ 166 - export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> { 173 + export function channel<T>(iterable: AsyncIterable<T>, _opts?: ChannelOptions): AsyncIterable<T> { 167 174 return new Channel(iterable) as unknown as AsyncIterable<T>; 168 175 } 169 176
+4 -4
src/execute.ts
··· 64 64 streamPortStack[streamPortStack.length - 1].push(port2); 65 65 return port2; 66 66 } 67 - // Channel wrapper — supports fan-out via shared distributor 67 + // Channel wrapper — single distributor loop + per-consumer atomics backpressure. 68 68 if (arg instanceof Channel) { 69 - const port2 = arg.addConsumer(); 70 - streamPortStack[streamPortStack.length - 1].push(port2); 71 - return port2; 69 + const { port, flags, readySignal } = arg.addConsumer(); 70 + streamPortStack[streamPortStack.length - 1].push(port); 71 + return { __stream__: true, port, flags, readySignal }; 72 72 } 73 73 if (arg instanceof PromiseLikeTask) { 74 74 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) };
+42 -6
src/worker-entry.ts
··· 3 3 import { registry } from './registry.ts'; 4 4 import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5 5 import { collectTransferables } from './transfer.ts'; 6 - import { pipeIterable, pipeFlagsFromBuffer } from './pipe.ts'; 6 + import { pipeIterable, pipeFlagsFromBuffer, CANCEL } from './pipe.ts'; 7 + import type { PipeFlags } from './pipe.ts'; 8 + import { Int32Atomic } from './shared/int32-atomic.ts'; 7 9 8 10 const imported = new Set<string>(); 9 11 const taskCache = new Map<number, unknown>(); ··· 15 17 return typeof arg === 'object' && arg !== null && '__task__' in arg; 16 18 } 17 19 20 + function isStreamArg( 21 + arg: unknown, 22 + ): arg is { __stream__: true; port: MessagePort; flags: SharedArrayBuffer; readySignal?: SharedArrayBuffer } { 23 + return typeof arg === 'object' && arg !== null && (arg as any).__stream__ === true; 24 + } 25 + 18 26 function needsAsyncResolve(arg: unknown): boolean { 19 - return arg instanceof MessagePort || isTaskArg(arg); 27 + return arg instanceof MessagePort || isStreamArg(arg) || isTaskArg(arg); 20 28 } 21 29 22 30 // Returns the function synchronously when cached — callers must not await ··· 39 47 return fn; 40 48 } 41 49 42 - function portToAsyncIterable<T>(port: MessagePort): AsyncIterable<T> { 50 + function portToAsyncIterable<T>(port: MessagePort, flags?: PipeFlags, readySignal?: Int32Atomic): AsyncIterable<T> { 43 51 const queue: T[] = []; 44 52 let done = false; 45 53 let error: Error | null = null; 46 - let paused = false; 47 54 let waiting: (() => void) | null = null; 48 55 56 + // Legacy message-based backpressure (when flags is absent). 57 + let paused = false; 49 58 const HIGH_WATER = 16; 50 59 51 60 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { ··· 71 80 waiting(); 72 81 waiting = null; 73 82 } 74 - if (!paused && queue.length >= HIGH_WATER) { 83 + if (!flags && !paused && queue.length >= HIGH_WATER) { 75 84 paused = true; 76 85 port.postMessage('pause'); 77 86 } ··· 84 93 while (true) { 85 94 if (queue.length > 0) { 86 95 const value = queue.shift()!; 87 - if (paused && queue.length <= 1) { 96 + if (flags) { 97 + // Atomics-based backpressure: decrement inflight. In stream 98 + // mode the producer parks on inflight, so we notify it; 99 + // in channel mode the producer parks on readySignal, and 100 + // we only wake it on the cap→below-cap transition. 101 + const prev = flags.inflight.sub(1); 102 + if (readySignal) { 103 + if (prev === 16) { 104 + readySignal.add(1); 105 + readySignal.notify(); 106 + } 107 + } else { 108 + flags.inflight.notify(); 109 + } 110 + } else if (paused && queue.length <= 1) { 88 111 paused = false; 89 112 port.postMessage('resume'); 90 113 } ··· 98 121 } 99 122 }, 100 123 async return(): Promise<IteratorResult<T>> { 124 + if (flags) { 125 + flags.state.store(CANCEL); 126 + flags.inflight.notify(); 127 + if (readySignal) { 128 + readySignal.add(1); 129 + readySignal.notify(); 130 + } 131 + } 101 132 port.close(); 102 133 return { done: true, value: undefined }; 103 134 }, ··· 107 138 } 108 139 109 140 async function resolveArg(arg: unknown): Promise<unknown> { 141 + if (isStreamArg(arg)) { 142 + const flags = pipeFlagsFromBuffer(arg.flags); 143 + const readySignal = arg.readySignal ? new Int32Atomic(arg.readySignal, 0) : undefined; 144 + return portToAsyncIterable(arg.port, flags, readySignal); 145 + } 110 146 if (arg instanceof MessagePort) { 111 147 return portToAsyncIterable(arg); 112 148 }