Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(channel): respect highWaterMark option

channel(source, { highWaterMark }) now plumbs the threshold through:

- Channel stores it from opts (default 16)
- findReadyConsumer uses it for the cap check
- addConsumer returns it in the per-consumer handle
- prepareArg forwards it in the {__stream__, ...} arg
- Worker portToAsyncIterable receives it; uses it in the cap→below-cap
transition check that signals readySignal
- Legacy message-based path uses it for the pause-threshold too

Two sides both need it (producer scans against it; consumer detects
the transition to signal readiness), so it travels with the arg.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+41 -21
+23 -11
src/channel.ts
··· 9 9 import { collectTransferables } from './transfer.ts'; 10 10 11 11 const CHANNEL = Symbol.for('moroutine.channel'); 12 - const HIGH_WATER = 16; 12 + const DEFAULT_HIGH_WATER = 16; 13 13 14 14 /** Options for configuring a channel. */ 15 15 export interface ChannelOptions { 16 - /** Maximum number of items buffered before signaling backpressure. Defaults to 16. */ 16 + /** Max number of items any one consumer can have in-flight before the 17 + * distributor skips it and gives items to other ready consumers. 18 + * Defaults to 16. */ 17 19 highWaterMark?: number; 18 20 } 19 21 ··· 36 38 private readonly consumers: Consumer[] = []; 37 39 private readonly readySignalBuf: SharedArrayBuffer = new SharedArrayBuffer(4); 38 40 private readonly readySignal: Int32Atomic = new Int32Atomic(this.readySignalBuf, 0); 41 + private readonly highWater: number; 39 42 private pulling = false; 40 43 private done = false; 41 44 private error: Error | null = null; 42 45 readonly [CHANNEL] = true; 43 46 44 - constructor(source: AsyncIterable<T>) { 47 + constructor(source: AsyncIterable<T>, opts?: ChannelOptions) { 48 + this.highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 45 49 const src = 46 50 source instanceof AsyncIterableTask 47 51 ? (runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>) ··· 50 54 } 51 55 52 56 /** Per-consumer handle. Returns the worker-facing port, the consumer's 53 - * private flags (inflight + state) and the channel-wide readySignal. */ 54 - addConsumer(): { port: MessagePort; flags: SharedArrayBuffer; readySignal: SharedArrayBuffer } { 57 + * private flags (inflight + state), the channel-wide readySignal, and 58 + * the channel's highWaterMark (so the worker knows when to signal the 59 + * cap→below-cap transition). */ 60 + addConsumer(): { 61 + port: MessagePort; 62 + flags: SharedArrayBuffer; 63 + readySignal: SharedArrayBuffer; 64 + highWater: number; 65 + } { 55 66 const { port1, port2 } = new MessageChannel(); 56 67 port1.unref(); 57 68 const flags = newPipeFlags(); ··· 65 76 try { 66 77 port1.close(); 67 78 } catch {} 68 - return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf }; 79 + return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf, highWater: this.highWater }; 69 80 } 70 81 71 82 const consumer: Consumer = { port: port1, flags }; ··· 85 96 this.readySignal.notify(); 86 97 void this.tryPull(); 87 98 88 - return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf }; 99 + return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf, highWater: this.highWater }; 89 100 } 90 101 91 102 private findReadyConsumer(): Consumer | null { 103 + const hw = this.highWater; 92 104 for (let i = 0; i < this.consumers.length; i++) { 93 105 const c = this.consumers[i]; 94 106 if (c.flags.state.load() === CANCEL) continue; 95 - if (c.flags.inflight.load() < HIGH_WATER) return c; 107 + if (c.flags.inflight.load() < hw) return c; 96 108 } 97 109 return null; 98 110 } ··· 167 179 * Creates a channel for streaming values to workers. Supports fan-out when 168 180 * the same channel is passed to multiple tasks. 169 181 * @param iterable - The async iterable or StreamTask to distribute. 170 - * @param _opts - Optional. Reserved for future configuration. 182 + * @param opts - Optional. `highWaterMark` bounds per-consumer in-flight items. 171 183 * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use. 172 184 */ 173 - export function channel<T>(iterable: AsyncIterable<T>, _opts?: ChannelOptions): AsyncIterable<T> { 174 - return new Channel(iterable) as unknown as AsyncIterable<T>; 185 + export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> { 186 + return new Channel(iterable, opts) as unknown as AsyncIterable<T>; 175 187 } 176 188 177 189 export { CHANNEL };
+2 -2
src/execute.ts
··· 66 66 } 67 67 // Channel wrapper — single distributor loop + per-consumer atomics backpressure. 68 68 if (arg instanceof Channel) { 69 - const { port, flags, readySignal } = arg.addConsumer(); 69 + const { port, flags, readySignal, highWater } = arg.addConsumer(); 70 70 streamPortStack[streamPortStack.length - 1].push(port); 71 - return { __stream__: true, port, flags, readySignal }; 71 + return { __stream__: true, port, flags, readySignal, highWater }; 72 72 } 73 73 if (arg instanceof PromiseLikeTask) { 74 74 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) };
+16 -8
src/worker-entry.ts
··· 17 17 return typeof arg === 'object' && arg !== null && '__task__' in arg; 18 18 } 19 19 20 - function isStreamArg( 21 - arg: unknown, 22 - ): arg is { __stream__: true; port: MessagePort; flags: SharedArrayBuffer; readySignal?: SharedArrayBuffer } { 20 + function isStreamArg(arg: unknown): arg is { 21 + __stream__: true; 22 + port: MessagePort; 23 + flags: SharedArrayBuffer; 24 + readySignal?: SharedArrayBuffer; 25 + highWater?: number; 26 + } { 23 27 return typeof arg === 'object' && arg !== null && (arg as any).__stream__ === true; 24 28 } 25 29 ··· 47 51 return fn; 48 52 } 49 53 50 - function portToAsyncIterable<T>(port: MessagePort, flags?: PipeFlags, readySignal?: Int32Atomic): AsyncIterable<T> { 54 + function portToAsyncIterable<T>( 55 + port: MessagePort, 56 + flags?: PipeFlags, 57 + readySignal?: Int32Atomic, 58 + highWater: number = 16, 59 + ): AsyncIterable<T> { 51 60 const queue: T[] = []; 52 61 let done = false; 53 62 let error: Error | null = null; ··· 55 64 56 65 // Legacy message-based backpressure (when flags is absent). 57 66 let paused = false; 58 - const HIGH_WATER = 16; 59 67 60 68 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 61 69 if (msg.error) { ··· 80 88 waiting(); 81 89 waiting = null; 82 90 } 83 - if (!flags && !paused && queue.length >= HIGH_WATER) { 91 + if (!flags && !paused && queue.length >= highWater) { 84 92 paused = true; 85 93 port.postMessage('pause'); 86 94 } ··· 100 108 // we only wake it on the cap→below-cap transition. 101 109 const prev = flags.inflight.sub(1); 102 110 if (readySignal) { 103 - if (prev === 16) { 111 + if (prev === highWater) { 104 112 readySignal.add(1); 105 113 readySignal.notify(); 106 114 } ··· 141 149 if (isStreamArg(arg)) { 142 150 const flags = pipeFlagsFromBuffer(arg.flags); 143 151 const readySignal = arg.readySignal ? new Int32Atomic(arg.readySignal, 0) : undefined; 144 - return portToAsyncIterable(arg.port, flags, readySignal); 152 + return portToAsyncIterable(arg.port, flags, readySignal, arg.highWater); 145 153 } 146 154 if (arg instanceof MessagePort) { 147 155 return portToAsyncIterable(arg);