Offload functions to worker threads with shared memory primitives for Node.js.
1import type { Worker } from 'node:worker_threads';
2import { MessageChannel } from 'node:worker_threads';
3import type { MessagePort, Transferable } from 'node:worker_threads';
4import { transferableAbortSignal } from 'node:util';
5import { freezeModule } from './registry.ts';
6import { serializeArg, deserializeArg } from './shared/reconstruct.ts';
7import { extractTransferables, collectTransferables } from './transfer.ts';
8import { PromiseLikeTask } from './task.ts';
9import { AsyncIterableTask } from './stream-task.ts';
10import { runStreamOnDedicated } from './dedicated-runner.ts';
11import { CHANNEL, Channel } from './channel.ts';
12import type { ChannelOptions } from './channel.ts';
13
14let nextCallId = 0;
15const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>();
16const streamPortStack: MessagePort[][] = [];
17
18export function setupWorker(worker: Worker): void {
19 worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => {
20 const call = pending.get(msg.callId);
21 if (!call) return;
22 pending.delete(msg.callId);
23 if (msg.error !== undefined) {
24 call.reject(msg.error);
25 } else {
26 call.resolve(deserializeArg(msg.value));
27 }
28 });
29}
30
31const DEFAULT_HIGH_WATER = 16;
32const LOW_WATER = 1;
33
34function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void {
35 let paused = false;
36 let resumed: (() => void) | null = null;
37 let cancelled = false;
38
39 port.on('message', (signal: string) => {
40 if (signal === 'pause') {
41 paused = true;
42 } else if (signal === 'resume') {
43 paused = false;
44 if (resumed) {
45 resumed();
46 resumed = null;
47 }
48 }
49 });
50
51 port.on('close', () => {
52 cancelled = true;
53 if (resumed) {
54 resumed();
55 resumed = null;
56 }
57 });
58
59 (async () => {
60 try {
61 for await (const value of iterable) {
62 if (cancelled) break;
63 if (paused)
64 await new Promise<void>((r) => {
65 resumed = r;
66 });
67 if (cancelled) break;
68 const extracted = extractTransferables([value]);
69 const serialized = serializeArg(extracted.args[0]);
70 const transferList: Transferable[] = [...extracted.transfer];
71 collectTransferables(extracted.args[0], transferList);
72 port.postMessage({ value: serialized, done: false }, transferList as any[]);
73 }
74 if (!cancelled) port.postMessage({ done: true });
75 } catch (err) {
76 if (!cancelled) {
77 port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) });
78 }
79 }
80 try {
81 port.close();
82 } catch {}
83 })();
84}
85
86function isAsyncGenerator(arg: unknown): boolean {
87 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator';
88}
89
90function prepareArg(arg: unknown): unknown {
91 // Auto-detect AbortSignal args — mark transferable and include in transfer list
92 if (arg instanceof AbortSignal) {
93 const signal = transferableAbortSignal(arg);
94 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort);
95 return signal;
96 }
97 // Auto-detect AsyncGenerator args — pipe via MessageChannel
98 if (isAsyncGenerator(arg)) {
99 const { port1, port2 } = new MessageChannel();
100 port1.unref();
101 pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER);
102 streamPortStack[streamPortStack.length - 1].push(port2);
103 return port2;
104 }
105 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output
106 if (arg instanceof AsyncIterableTask) {
107 const iterable = runStreamOnDedicated(arg.id, arg.args);
108 const { port1, port2 } = new MessageChannel();
109 port1.unref();
110 pipeToPort(iterable, port1, DEFAULT_HIGH_WATER);
111 streamPortStack[streamPortStack.length - 1].push(port2);
112 return port2;
113 }
114 // Channel wrapper — supports fan-out via shared distributor
115 if (arg instanceof Channel) {
116 const port2 = arg.addConsumer();
117 streamPortStack[streamPortStack.length - 1].push(port2);
118 return port2;
119 }
120 if (arg instanceof PromiseLikeTask) {
121 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) };
122 }
123 return serializeArg(arg);
124}
125
126export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> {
127 const url = id.slice(0, id.lastIndexOf('#'));
128 freezeModule(url);
129 const callId = nextCallId++;
130 return new Promise<T>((resolve, reject) => {
131 pending.set(callId, { resolve, reject });
132 const extracted = extractTransferables(args);
133 streamPortStack.push([]);
134 const preparedArgs = extracted.args.map(prepareArg);
135 const ports = streamPortStack.pop()!;
136 const msg = { callId, id, args: preparedArgs };
137 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]);
138 });
139}
140
141export interface StreamDispatch<T> {
142 iterable: AsyncIterable<T>;
143 done: Promise<void>;
144}
145
146export function dispatchStream<T>(
147 worker: Worker,
148 id: string,
149 args: unknown[],
150 opts?: ChannelOptions,
151): StreamDispatch<T> {
152 const url = id.slice(0, id.lastIndexOf('#'));
153 freezeModule(url);
154 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER;
155
156 const { port1, port2 } = new MessageChannel();
157
158 const extracted = extractTransferables(args);
159 streamPortStack.push([]);
160 const preparedArgs = extracted.args.map(prepareArg);
161 const ports = streamPortStack.pop()!;
162 const msg = { id, args: preparedArgs, port: port2 };
163 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]);
164
165 let resolveDone: () => void;
166 const donePromise = new Promise<void>((r) => {
167 resolveDone = r;
168 });
169
170 const queue: T[] = [];
171 let done = false;
172 let error: Error | null = null;
173 let paused = false;
174 let waiting: (() => void) | null = null;
175
176 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => {
177 if (msg.error) {
178 error = msg.error;
179 done = true;
180 port1.close();
181 resolveDone!();
182 if (waiting) {
183 waiting();
184 waiting = null;
185 }
186 return;
187 }
188 if (msg.done) {
189 done = true;
190 port1.close();
191 resolveDone!();
192 if (waiting) {
193 waiting();
194 waiting = null;
195 }
196 return;
197 }
198 queue.push(deserializeArg(msg.value) as T);
199 if (waiting) {
200 waiting();
201 waiting = null;
202 }
203 if (!paused && queue.length >= highWater) {
204 paused = true;
205 port1.postMessage('pause');
206 }
207 });
208 port1.unref();
209
210 return {
211 iterable: {
212 [Symbol.asyncIterator]() {
213 return {
214 async next(): Promise<IteratorResult<T>> {
215 while (true) {
216 if (queue.length > 0) {
217 const value = queue.shift()!;
218 if (paused && queue.length <= LOW_WATER) {
219 paused = false;
220 port1.postMessage('resume');
221 }
222 return { done: false, value };
223 }
224 if (error) throw error;
225 if (done) return { done: true, value: undefined };
226 await new Promise<void>((resolve) => {
227 waiting = resolve;
228 });
229 }
230 },
231 async return(): Promise<IteratorResult<T>> {
232 port1.close();
233 resolveDone!();
234 return { done: true, value: undefined };
235 },
236 };
237 },
238 },
239 done: donePromise,
240 };
241}