Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 196 lines 6.6 kB view raw
1import { MessageChannel } from 'node:worker_threads'; 2import type { MessagePort, Transferable } from 'node:worker_threads'; 3import { AsyncIterableTask } from './stream-task.ts'; 4import { runStreamOnDedicated } from './dedicated-runner.ts'; 5import { newPipeFlags, CANCEL, DEFAULT_HIGH_WATER } from './pipe.ts'; 6import type { PipeFlags, StreamHandle } from './pipe.ts'; 7import { Int32Atomic } from './shared/int32-atomic.ts'; 8import { serializeArg } from './shared/reconstruct.ts'; 9import { collectTransferables } from './transfer.ts'; 10 11const CHANNEL = Symbol.for('moroutine.channel'); 12 13/** Options for configuring a streaming dispatch or channel. */ 14export interface ChannelOptions { 15 /** 16 * Maximum number of in-flight items on a stream before the producer parks. 17 * For a streaming task (`run(streamTask)`), this is how many items the 18 * worker may have emitted ahead of the parent's consumption. For a 19 * `channel()`, it's the per-consumer cap — the distributor skips any 20 * consumer at cap and gives items to another ready consumer. 21 * Defaults to 16. 22 */ 23 highWaterMark?: number; 24} 25 26interface Consumer { 27 port: MessagePort; 28 flags: PipeFlags; 29} 30 31/** 32 * One-producer, many-consumer source with atomics-based backpressure. 33 * 34 * A single `tryPull` loop iterates the shared source and dispatches each 35 * item to the first below-cap consumer starting from `cursor` (skip-based 36 * round-robin). When every consumer is at cap, the loop parks on a shared 37 * `readySignal` atomic until any consumer pulls from its port (which 38 * decrements its inflight and bumps readySignal, waking the loop). 39 */ 40export class Channel<T> { 41 private readonly iter: AsyncIterator<T>; 42 private readonly consumers: Consumer[] = []; 43 private readonly readySignal: Int32Atomic = new Int32Atomic(); 44 private readonly highWater: number; 45 private cursor = 0; 46 private pulling = false; 47 private done = false; 48 private error: Error | null = null; 49 readonly [CHANNEL] = true; 50 51 constructor(source: AsyncIterable<T>, opts?: ChannelOptions) { 52 this.highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 53 const src = 54 source instanceof AsyncIterableTask 55 ? (runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>) 56 : source; 57 this.iter = src[Symbol.asyncIterator](); 58 } 59 60 /** Per-consumer stream handle — complete description of this consumer's 61 * producer-facing side of the channel. */ 62 addConsumer(): StreamHandle { 63 const { port1, port2 } = new MessageChannel(); 64 port1.unref(); 65 const flags = newPipeFlags(); 66 67 const handle: StreamHandle = { 68 port: port2, 69 flags, 70 readySignal: this.readySignal, 71 highWater: this.highWater, 72 }; 73 74 if (this.done) { 75 if (this.error) { 76 port1.postMessage({ done: true, error: this.error }); 77 } else { 78 port1.postMessage({ done: true }); 79 } 80 try { 81 port1.close(); 82 } catch {} 83 return handle; 84 } 85 86 const consumer: Consumer = { port: port1, flags }; 87 this.consumers.push(consumer); 88 89 port1.on('close', () => { 90 consumer.flags.fields.state.store(CANCEL); 91 const idx = this.consumers.indexOf(consumer); 92 if (idx >= 0) this.consumers.splice(idx, 1); 93 this.readySignal.add(1); 94 this.readySignal.notify(); 95 }); 96 97 // Ensure the pull loop is running; also bump readySignal in case the 98 // existing loop is currently parked (the new consumer is ready). 99 this.readySignal.add(1); 100 this.readySignal.notify(); 101 void this.tryPull(); 102 103 return handle; 104 } 105 106 // Skip-based round-robin: scan from `cursor`, pick the first non-cancelled 107 // consumer below cap. Cursor advances past the picked consumer so ties 108 // rotate across the below-cap set. Consumers at cap are skipped — this 109 // lets faster consumers keep pulling when a peer is saturated, trading 110 // strict fairness for resilience and throughput. 111 private findReadyConsumer(): Consumer | null { 112 const hw = this.highWater; 113 const n = this.consumers.length; 114 if (n === 0) return null; 115 for (let k = 0; k < n; k++) { 116 const i = (this.cursor + k) % n; 117 const c = this.consumers[i]; 118 if (c.flags.fields.state.load() === CANCEL) continue; 119 if (c.flags.fields.inflight.load() < hw) { 120 this.cursor = (i + 1) % n; 121 return c; 122 } 123 } 124 return null; 125 } 126 127 private async tryPull(): Promise<void> { 128 if (this.pulling || this.done) return; 129 this.pulling = true; 130 131 try { 132 while (!this.done) { 133 let consumer = this.findReadyConsumer(); 134 if (!consumer) { 135 const signal = this.readySignal.load(); 136 consumer = this.findReadyConsumer(); 137 if (!consumer) { 138 if (this.consumers.length === 0) return; 139 await this.readySignal.waitAsync(signal); 140 continue; 141 } 142 } 143 144 const result = await this.iter.next(); 145 if (result.done) { 146 this.done = true; 147 for (const c of this.consumers) { 148 try { 149 c.port.postMessage({ done: true }); 150 } catch {} 151 try { 152 c.port.close(); 153 } catch {} 154 } 155 return; 156 } 157 158 const serialized = serializeArg(result.value); 159 const transferList: Transferable[] = []; 160 collectTransferables(result.value, transferList); 161 try { 162 consumer.port.postMessage({ value: serialized, done: false }, transferList); 163 consumer.flags.fields.inflight.add(1); 164 } catch { 165 // Consumer may have closed between scan and post; move on. 166 } 167 } 168 } catch (err) { 169 this.error = err instanceof Error ? err : new Error(String(err)); 170 this.done = true; 171 for (const c of this.consumers) { 172 try { 173 c.port.postMessage({ done: true, error: this.error }); 174 } catch {} 175 try { 176 c.port.close(); 177 } catch {} 178 } 179 } finally { 180 this.pulling = false; 181 } 182 } 183} 184 185/** 186 * Creates a channel for streaming values to workers. Supports fan-out when 187 * the same channel is passed to multiple tasks. 188 * @param iterable - The async iterable or StreamTask to distribute. 189 * @param opts - Optional. `highWaterMark` bounds per-consumer in-flight items. 190 * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use. 191 */ 192export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> { 193 return new Channel(iterable, opts) as unknown as AsyncIterable<T>; 194} 195 196export { CHANNEL };