Offload functions to worker threads with shared memory primitives for Node.js.
1import type { Worker } from 'node:worker_threads';
2import { MessageChannel } from 'node:worker_threads';
3import type { MessagePort, Transferable } from 'node:worker_threads';
4import { transferableAbortSignal } from 'node:util';
5import { freezeModule } from './registry.ts';
6import { serializeArg, deserializeArg } from './shared/reconstruct.ts';
7import { extractTransferables, collectTransferables } from './transfer.ts';
8import { Task } from './task.ts';
9import { StreamTask } from './stream-task.ts';
10import { runStreamOnDedicated } from './dedicated-runner.ts';
11import { CHANNEL, Channel } from './channel.ts';
12import type { ChannelOptions } from './channel.ts';
13
14let nextCallId = 0;
15const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>();
16const streamPortStack: MessagePort[][] = [];
17
18export function setupWorker(worker: Worker): void {
19 worker.on('message', (msg: { callId: number; value?: unknown; error?: string }) => {
20 const call = pending.get(msg.callId);
21 if (!call) return;
22 pending.delete(msg.callId);
23 // Unref the worker now that this call is done (re-ref happened in execute())
24 worker.unref();
25 if (msg.error !== undefined) {
26 call.reject(new Error(msg.error));
27 } else {
28 call.resolve(deserializeArg(msg.value));
29 }
30 });
31}
32
33const DEFAULT_HIGH_WATER = 16;
34const LOW_WATER = 1;
35
36function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void {
37 let paused = false;
38 let resumed: (() => void) | null = null;
39 let cancelled = false;
40
41 port.on('message', (signal: string) => {
42 if (signal === 'pause') {
43 paused = true;
44 } else if (signal === 'resume') {
45 paused = false;
46 if (resumed) {
47 resumed();
48 resumed = null;
49 }
50 }
51 });
52
53 port.on('close', () => {
54 cancelled = true;
55 if (resumed) {
56 resumed();
57 resumed = null;
58 }
59 });
60
61 (async () => {
62 try {
63 for await (const value of iterable) {
64 if (cancelled) break;
65 if (paused)
66 await new Promise<void>((r) => {
67 resumed = r;
68 });
69 if (cancelled) break;
70 const extracted = extractTransferables([value]);
71 const serialized = serializeArg(extracted.args[0]);
72 const transferList: Transferable[] = [...extracted.transfer];
73 collectTransferables(extracted.args[0], transferList);
74 port.postMessage({ value: serialized, done: false }, transferList as any[]);
75 }
76 if (!cancelled) port.postMessage({ done: true });
77 } catch (err) {
78 if (!cancelled) {
79 const message = err instanceof Error ? err.message : String(err);
80 port.postMessage({ done: true, error: message });
81 }
82 }
83 try {
84 port.close();
85 } catch {}
86 })();
87}
88
89function isAsyncGenerator(arg: unknown): boolean {
90 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator';
91}
92
93function prepareArg(arg: unknown): unknown {
94 // Auto-detect AbortSignal args — mark transferable and include in transfer list
95 if (arg instanceof AbortSignal) {
96 const signal = transferableAbortSignal(arg);
97 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort);
98 return signal;
99 }
100 // Auto-detect AsyncGenerator args — pipe via MessageChannel
101 if (isAsyncGenerator(arg)) {
102 const { port1, port2 } = new MessageChannel();
103 port1.unref();
104 pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER);
105 streamPortStack[streamPortStack.length - 1].push(port2);
106 return port2;
107 }
108 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output
109 if (arg instanceof StreamTask) {
110 const iterable = runStreamOnDedicated(arg.id, arg.args);
111 const { port1, port2 } = new MessageChannel();
112 port1.unref();
113 pipeToPort(iterable, port1, DEFAULT_HIGH_WATER);
114 streamPortStack[streamPortStack.length - 1].push(port2);
115 return port2;
116 }
117 // Channel wrapper — supports fan-out via shared distributor
118 if (arg instanceof Channel) {
119 const port2 = arg.addConsumer();
120 streamPortStack[streamPortStack.length - 1].push(port2);
121 return port2;
122 }
123 if (arg instanceof Task) {
124 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) };
125 }
126 return serializeArg(arg);
127}
128
129export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> {
130 const url = id.slice(0, id.lastIndexOf('#'));
131 freezeModule(url);
132 const callId = nextCallId++;
133 // Ref the worker for the duration of this call so the event loop stays alive
134 worker.ref();
135 return new Promise<T>((resolve, reject) => {
136 pending.set(callId, { resolve, reject });
137 const extracted = extractTransferables(args);
138 streamPortStack.push([]);
139 const preparedArgs = extracted.args.map(prepareArg);
140 const ports = streamPortStack.pop()!;
141 const msg = { callId, id, args: preparedArgs };
142 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]);
143 });
144}
145
146export function dispatchStream<T>(
147 worker: Worker,
148 id: string,
149 args: unknown[],
150 opts?: ChannelOptions,
151): AsyncIterable<T> {
152 const url = id.slice(0, id.lastIndexOf('#'));
153 freezeModule(url);
154 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER;
155
156 const { port1, port2 } = new MessageChannel();
157
158 const extracted = extractTransferables(args);
159 streamPortStack.push([]);
160 const preparedArgs = extracted.args.map(prepareArg);
161 const ports = streamPortStack.pop()!;
162 const msg = { id, args: preparedArgs, port: port2 };
163 // Ref the worker for the duration of the stream so the event loop stays alive
164 worker.ref();
165 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]);
166
167 const queue: T[] = [];
168 let done = false;
169 let error: Error | null = null;
170 let paused = false;
171 let waiting: (() => void) | null = null;
172 let workerUnrefed = false;
173
174 function unrefWorkerOnce() {
175 if (!workerUnrefed) {
176 workerUnrefed = true;
177 worker.unref();
178 }
179 }
180
181 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: string }) => {
182 if (msg.error) {
183 error = new Error(msg.error);
184 done = true;
185 port1.close();
186 unrefWorkerOnce();
187 if (waiting) {
188 waiting();
189 waiting = null;
190 }
191 return;
192 }
193 if (msg.done) {
194 done = true;
195 port1.close();
196 unrefWorkerOnce();
197 if (waiting) {
198 waiting();
199 waiting = null;
200 }
201 return;
202 }
203 queue.push(deserializeArg(msg.value) as T);
204 if (waiting) {
205 waiting();
206 waiting = null;
207 }
208 if (!paused && queue.length >= highWater) {
209 paused = true;
210 port1.postMessage('pause');
211 }
212 });
213 port1.unref();
214
215 return {
216 [Symbol.asyncIterator]() {
217 return {
218 async next(): Promise<IteratorResult<T>> {
219 while (true) {
220 if (queue.length > 0) {
221 const value = queue.shift()!;
222 if (paused && queue.length <= LOW_WATER) {
223 paused = false;
224 port1.postMessage('resume');
225 }
226 return { done: false, value };
227 }
228 if (error) throw error;
229 if (done) return { done: true, value: undefined };
230 await new Promise<void>((resolve) => {
231 waiting = resolve;
232 });
233 }
234 },
235 async return(): Promise<IteratorResult<T>> {
236 port1.close();
237 unrefWorkerOnce();
238 return { done: true, value: undefined };
239 },
240 };
241 },
242 };
243}