Offload functions to worker threads with shared memory primitives for Node.js.
1import { MessageChannel, MessagePort } from 'node:worker_threads';
2import type { Transferable } from 'node:worker_threads';
3import { serializeArg } from './shared/reconstruct.ts';
4import { collectTransferables } from './transfer.ts';
5import { AsyncIterableTask } from './stream-task.ts';
6import { runStreamOnDedicated } from './dedicated-runner.ts';
7
8const CHANNEL = Symbol.for('moroutine.channel');
9
10/** Options for configuring a channel. */
11export interface ChannelOptions {
12 /** Maximum number of items buffered before signaling backpressure. Defaults to 16. */
13 highWaterMark?: number;
14}
15
16const DEFAULT_HIGH_WATER = 16;
17
18interface Consumer {
19 port: MessagePort;
20 ready: boolean;
21}
22
23class Distributor<T> {
24 private consumers: Consumer[] = [];
25 private sourceIterator: AsyncIterator<T>;
26 private done = false;
27 private error: Error | null = null;
28 private pulling = false;
29
30 constructor(source: AsyncIterable<T>) {
31 this.sourceIterator = source[Symbol.asyncIterator]();
32 }
33
34 addConsumer(): MessagePort {
35 const { port1, port2 } = new MessageChannel();
36 port1.unref();
37
38 const consumer: Consumer = { port: port1, ready: true };
39
40 port1.on('message', (signal: string) => {
41 if (signal === 'pause') {
42 consumer.ready = false;
43 } else if (signal === 'resume') {
44 consumer.ready = true;
45 this.tryPull();
46 }
47 });
48
49 port1.on('close', () => {
50 this.consumers = this.consumers.filter((c) => c !== consumer);
51 });
52
53 this.consumers.push(consumer);
54
55 if (this.done) {
56 if (this.error) {
57 port1.postMessage({ done: true, error: this.error.message });
58 } else {
59 port1.postMessage({ done: true });
60 }
61 try {
62 port1.close();
63 } catch {}
64 } else {
65 this.tryPull();
66 }
67
68 return port2;
69 }
70
71 private async tryPull(): Promise<void> {
72 if (this.pulling || this.done) return;
73 this.pulling = true;
74
75 try {
76 while (!this.done) {
77 const readyConsumer = this.consumers.find((c) => c.ready);
78 if (!readyConsumer) break; // no ready consumers, pause pulling
79
80 const result = await this.sourceIterator.next();
81 if (result.done) {
82 this.done = true;
83 for (const c of this.consumers) {
84 c.port.postMessage({ done: true });
85 try {
86 c.port.close();
87 } catch {}
88 }
89 break;
90 }
91
92 const serialized = serializeArg(result.value);
93 const transferList: Transferable[] = [];
94 collectTransferables(result.value, transferList);
95 readyConsumer.port.postMessage({ value: serialized, done: false }, transferList as any[]);
96
97 // After sending, yield to event loop so pause signals can arrive
98 await new Promise((r) => setImmediate(r));
99 }
100 } catch (err) {
101 this.done = true;
102 this.error = err instanceof Error ? err : new Error(String(err));
103 for (const c of this.consumers) {
104 c.port.postMessage({ done: true, error: this.error.message });
105 try {
106 c.port.close();
107 } catch {}
108 }
109 }
110
111 this.pulling = false;
112 }
113}
114
115export class Channel<T> {
116 private distributor: Distributor<T> | null = null;
117 private source: AsyncIterable<T>;
118 readonly [CHANNEL] = true;
119
120 constructor(source: AsyncIterable<T>) {
121 this.source = source;
122 }
123
124 addConsumer(): MessagePort {
125 if (!this.distributor) {
126 let source: AsyncIterable<T> = this.source;
127 if (source instanceof AsyncIterableTask) {
128 source = runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>;
129 }
130 this.distributor = new Distributor<T>(source);
131 }
132 return this.distributor.addConsumer();
133 }
134}
135
136/**
137 * Creates a channel for streaming values to workers. Supports fan-out when
138 * the same channel is passed to multiple tasks.
139 * @param iterable - The async iterable or StreamTask to distribute.
140 * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16).
141 * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use.
142 */
143export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> {
144 return new Channel(iterable) as unknown as AsyncIterable<T>;
145}
146
147export { CHANNEL };