Offload functions to worker threads with shared memory primitives for Node.js.
1import { MessageChannel } from 'node:worker_threads';
2import type { MessagePort, Transferable } from 'node:worker_threads';
3import { AsyncIterableTask } from './stream-task.ts';
4import { runStreamOnDedicated } from './dedicated-runner.ts';
5import { newPipeFlags, CANCEL, DEFAULT_HIGH_WATER } from './pipe.ts';
6import type { PipeFlags, StreamHandle } from './pipe.ts';
7import { Int32Atomic } from './shared/int32-atomic.ts';
8import { serializeArg } from './shared/reconstruct.ts';
9import { collectTransferables } from './transfer.ts';
10
11const CHANNEL = Symbol.for('moroutine.channel');
12
13/** Options for configuring a streaming dispatch or channel. */
14export interface ChannelOptions {
15 /**
16 * Maximum number of in-flight items on a stream before the producer parks.
17 * For a streaming task (`run(streamTask)`), this is how many items the
18 * worker may have emitted ahead of the parent's consumption. For a
19 * `channel()`, it's the per-consumer cap — the distributor skips any
20 * consumer at cap and gives items to another ready consumer.
21 * Defaults to 16.
22 */
23 highWaterMark?: number;
24}
25
26interface Consumer {
27 port: MessagePort;
28 flags: PipeFlags;
29}
30
31/**
32 * One-producer, many-consumer source with atomics-based backpressure.
33 *
34 * A single `tryPull` loop iterates the shared source and dispatches each
35 * item to the first below-cap consumer starting from `cursor` (skip-based
36 * round-robin). When every consumer is at cap, the loop parks on a shared
37 * `readySignal` atomic until any consumer pulls from its port (which
38 * decrements its inflight and bumps readySignal, waking the loop).
39 */
40export class Channel<T> {
41 private readonly iter: AsyncIterator<T>;
42 private readonly consumers: Consumer[] = [];
43 private readonly readySignal: Int32Atomic = new Int32Atomic();
44 private readonly highWater: number;
45 private cursor = 0;
46 private pulling = false;
47 private done = false;
48 private error: Error | null = null;
49 readonly [CHANNEL] = true;
50
51 constructor(source: AsyncIterable<T>, opts?: ChannelOptions) {
52 this.highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER;
53 const src =
54 source instanceof AsyncIterableTask
55 ? (runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>)
56 : source;
57 this.iter = src[Symbol.asyncIterator]();
58 }
59
60 /** Per-consumer stream handle — complete description of this consumer's
61 * producer-facing side of the channel. */
62 addConsumer(): StreamHandle {
63 const { port1, port2 } = new MessageChannel();
64 port1.unref();
65 const flags = newPipeFlags();
66
67 const handle: StreamHandle = {
68 port: port2,
69 flags,
70 readySignal: this.readySignal,
71 highWater: this.highWater,
72 };
73
74 if (this.done) {
75 if (this.error) {
76 port1.postMessage({ done: true, error: this.error });
77 } else {
78 port1.postMessage({ done: true });
79 }
80 try {
81 port1.close();
82 } catch {}
83 return handle;
84 }
85
86 const consumer: Consumer = { port: port1, flags };
87 this.consumers.push(consumer);
88
89 port1.on('close', () => {
90 consumer.flags.fields.state.store(CANCEL);
91 const idx = this.consumers.indexOf(consumer);
92 if (idx >= 0) this.consumers.splice(idx, 1);
93 this.readySignal.add(1);
94 this.readySignal.notify();
95 });
96
97 // Ensure the pull loop is running; also bump readySignal in case the
98 // existing loop is currently parked (the new consumer is ready).
99 this.readySignal.add(1);
100 this.readySignal.notify();
101 void this.tryPull();
102
103 return handle;
104 }
105
106 // Skip-based round-robin: scan from `cursor`, pick the first non-cancelled
107 // consumer below cap. Cursor advances past the picked consumer so ties
108 // rotate across the below-cap set. Consumers at cap are skipped — this
109 // lets faster consumers keep pulling when a peer is saturated, trading
110 // strict fairness for resilience and throughput.
111 private findReadyConsumer(): Consumer | null {
112 const hw = this.highWater;
113 const n = this.consumers.length;
114 if (n === 0) return null;
115 for (let k = 0; k < n; k++) {
116 const i = (this.cursor + k) % n;
117 const c = this.consumers[i];
118 if (c.flags.fields.state.load() === CANCEL) continue;
119 if (c.flags.fields.inflight.load() < hw) {
120 this.cursor = (i + 1) % n;
121 return c;
122 }
123 }
124 return null;
125 }
126
127 private async tryPull(): Promise<void> {
128 if (this.pulling || this.done) return;
129 this.pulling = true;
130
131 try {
132 while (!this.done) {
133 let consumer = this.findReadyConsumer();
134 if (!consumer) {
135 const signal = this.readySignal.load();
136 consumer = this.findReadyConsumer();
137 if (!consumer) {
138 if (this.consumers.length === 0) return;
139 await this.readySignal.waitAsync(signal);
140 continue;
141 }
142 }
143
144 const result = await this.iter.next();
145 if (result.done) {
146 this.done = true;
147 for (const c of this.consumers) {
148 try {
149 c.port.postMessage({ done: true });
150 } catch {}
151 try {
152 c.port.close();
153 } catch {}
154 }
155 return;
156 }
157
158 const serialized = serializeArg(result.value);
159 const transferList: Transferable[] = [];
160 collectTransferables(result.value, transferList);
161 try {
162 consumer.port.postMessage({ value: serialized, done: false }, transferList);
163 consumer.flags.fields.inflight.add(1);
164 } catch {
165 // Consumer may have closed between scan and post; move on.
166 }
167 }
168 } catch (err) {
169 this.error = err instanceof Error ? err : new Error(String(err));
170 this.done = true;
171 for (const c of this.consumers) {
172 try {
173 c.port.postMessage({ done: true, error: this.error });
174 } catch {}
175 try {
176 c.port.close();
177 } catch {}
178 }
179 } finally {
180 this.pulling = false;
181 }
182 }
183}
184
185/**
186 * Creates a channel for streaming values to workers. Supports fan-out when
187 * the same channel is passed to multiple tasks.
188 * @param iterable - The async iterable or StreamTask to distribute.
189 * @param opts - Optional. `highWaterMark` bounds per-consumer in-flight items.
190 * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use.
191 */
192export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> {
193 return new Channel(iterable, opts) as unknown as AsyncIterable<T>;
194}
195
196export { CHANNEL };