Offload functions to worker threads with shared memory primitives for Node.js.
1import { parentPort, MessagePort } from 'node:worker_threads';
2import type { Transferable } from 'node:worker_threads';
3import { registry } from './registry.ts';
4import { deserializeArg, serializeArg } from './shared/reconstruct.ts';
5import { collectTransferables } from './transfer.ts';
6import { pipeIterable, pipeFlagsFromBuffer, CANCEL } from './pipe.ts';
7import type { PipeFlags } from './pipe.ts';
8import { Int32Atomic } from './shared/int32-atomic.ts';
9
10const imported = new Set<string>();
11const taskCache = new Map<number, unknown>();
12
13type Fn = (...args: unknown[]) => unknown;
14const fnCache = new Map<string, Fn>();
15
16function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } {
17 return typeof arg === 'object' && arg !== null && '__task__' in arg;
18}
19
20function isStreamArg(
21 arg: unknown,
22): arg is { __stream__: true; port: MessagePort; flags: SharedArrayBuffer; readySignal?: SharedArrayBuffer } {
23 return typeof arg === 'object' && arg !== null && (arg as any).__stream__ === true;
24}
25
26function needsAsyncResolve(arg: unknown): boolean {
27 return arg instanceof MessagePort || isStreamArg(arg) || isTaskArg(arg);
28}
29
30// Returns the function synchronously when cached — callers must not await
31// unconditionally, or they pay a microtask hop for every dispatch.
32function resolveFn(id: string): Fn | Promise<Fn> {
33 const cached = fnCache.get(id);
34 if (cached) return cached;
35 return resolveFnSlow(id);
36}
37
38async function resolveFnSlow(id: string): Promise<Fn> {
39 const url = id.slice(0, id.lastIndexOf('#'));
40 if (!imported.has(url)) {
41 await import(url);
42 imported.add(url);
43 }
44 const fn = registry.get(id) as Fn | undefined;
45 if (!fn) throw new Error(`Moroutine not found: ${id}`);
46 fnCache.set(id, fn);
47 return fn;
48}
49
50function portToAsyncIterable<T>(port: MessagePort, flags?: PipeFlags, readySignal?: Int32Atomic): AsyncIterable<T> {
51 const queue: T[] = [];
52 let done = false;
53 let error: Error | null = null;
54 let waiting: (() => void) | null = null;
55
56 // Legacy message-based backpressure (when flags is absent).
57 let paused = false;
58 const HIGH_WATER = 16;
59
60 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => {
61 if (msg.error) {
62 error = msg.error;
63 done = true;
64 if (waiting) {
65 waiting();
66 waiting = null;
67 }
68 return;
69 }
70 if (msg.done) {
71 done = true;
72 if (waiting) {
73 waiting();
74 waiting = null;
75 }
76 return;
77 }
78 queue.push(deserializeArg(msg.value) as T);
79 if (waiting) {
80 waiting();
81 waiting = null;
82 }
83 if (!flags && !paused && queue.length >= HIGH_WATER) {
84 paused = true;
85 port.postMessage('pause');
86 }
87 });
88
89 return {
90 [Symbol.asyncIterator]() {
91 return {
92 async next(): Promise<IteratorResult<T>> {
93 while (true) {
94 if (queue.length > 0) {
95 const value = queue.shift()!;
96 if (flags) {
97 // Atomics-based backpressure: decrement inflight. In stream
98 // mode the producer parks on inflight, so we notify it;
99 // in channel mode the producer parks on readySignal, and
100 // we only wake it on the cap→below-cap transition.
101 const prev = flags.inflight.sub(1);
102 if (readySignal) {
103 if (prev === 16) {
104 readySignal.add(1);
105 readySignal.notify();
106 }
107 } else {
108 flags.inflight.notify();
109 }
110 } else if (paused && queue.length <= 1) {
111 paused = false;
112 port.postMessage('resume');
113 }
114 return { done: false, value };
115 }
116 if (error) throw error;
117 if (done) return { done: true, value: undefined };
118 await new Promise<void>((resolve) => {
119 waiting = resolve;
120 });
121 }
122 },
123 async return(): Promise<IteratorResult<T>> {
124 if (flags) {
125 flags.state.store(CANCEL);
126 flags.inflight.notify();
127 if (readySignal) {
128 readySignal.add(1);
129 readySignal.notify();
130 }
131 }
132 port.close();
133 return { done: true, value: undefined };
134 },
135 };
136 },
137 };
138}
139
140async function resolveArg(arg: unknown): Promise<unknown> {
141 if (isStreamArg(arg)) {
142 const flags = pipeFlagsFromBuffer(arg.flags);
143 const readySignal = arg.readySignal ? new Int32Atomic(arg.readySignal, 0) : undefined;
144 return portToAsyncIterable(arg.port, flags, readySignal);
145 }
146 if (arg instanceof MessagePort) {
147 return portToAsyncIterable(arg);
148 }
149 if (isTaskArg(arg)) {
150 if (taskCache.has(arg.__task__)) {
151 return taskCache.get(arg.__task__);
152 }
153 const inputArgs = arg.args;
154 const resolvedArgs = new Array(inputArgs.length);
155 for (let i = 0; i < inputArgs.length; i++) {
156 const a = inputArgs[i];
157 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a);
158 }
159 const fnM = resolveFn(arg.id);
160 const fn = fnM instanceof Promise ? await fnM : fnM;
161 const raw = fn(...resolvedArgs);
162 const value = raw instanceof Promise ? await raw : raw;
163 taskCache.set(arg.__task__, value);
164 return value;
165 }
166 return deserializeArg(arg);
167}
168
169type TaskMsg = { callId?: number; id: string; args: unknown[]; port?: MessagePort; flags?: SharedArrayBuffer };
170
171function postResult(callId: number, value: unknown): void {
172 const returnValue = serializeArg(value);
173 const transferList: Transferable[] = [];
174 collectTransferables(value, transferList);
175 parentPort!.postMessage({ callId, value: returnValue }, transferList);
176}
177
178function postError(callId: number, err: unknown): void {
179 parentPort!.postMessage({
180 callId,
181 error: err instanceof Error ? err : new Error(String(err)),
182 });
183}
184
185// Value-task path, structured to pay zero microtask hops when the whole
186// chain (fn resolution, arg resolution, function invocation) is sync.
187function handleValueTask(msg: TaskMsg): void {
188 const callId = msg.callId!;
189 try {
190 const fnM = resolveFn(msg.id);
191 if (fnM instanceof Promise) {
192 fnM.then(
193 (fn) => invokeWithArgs(callId, fn, msg.args),
194 (err) => postError(callId, err),
195 );
196 return;
197 }
198 invokeWithArgs(callId, fnM, msg.args);
199 } catch (err) {
200 postError(callId, err);
201 }
202}
203
204function invokeWithArgs(callId: number, fn: Fn, args: unknown[]): void {
205 try {
206 // Sync prologue: walk args in-place until we hit one that needs async
207 // resolution. If we finish the loop synchronously, go straight to invoke.
208 const resolved = new Array(args.length);
209 let i = 0;
210 for (; i < args.length; i++) {
211 const arg = args[i];
212 if (needsAsyncResolve(arg)) break;
213 resolved[i] = deserializeArg(arg);
214 }
215 if (i === args.length) {
216 invokeAndRespond(callId, fn, resolved);
217 return;
218 }
219 // Async tail: same loop body, awaiting where needed.
220 (async () => {
221 for (; i < args.length; i++) {
222 const arg = args[i];
223 resolved[i] = needsAsyncResolve(arg) ? await resolveArg(arg) : deserializeArg(arg);
224 }
225 invokeAndRespond(callId, fn, resolved);
226 })().catch((err) => postError(callId, err));
227 } catch (err) {
228 postError(callId, err);
229 }
230}
231
232function invokeAndRespond(callId: number, fn: Fn, args: unknown[]): void {
233 try {
234 const raw = fn(...args);
235 if (raw instanceof Promise) {
236 raw.then(
237 (v) => postResult(callId, v),
238 (err) => postError(callId, err),
239 );
240 return;
241 }
242 postResult(callId, raw);
243 } catch (err) {
244 postError(callId, err);
245 }
246}
247
248async function handleStreamTask(msg: TaskMsg): Promise<void> {
249 const { id, args, port, flags } = msg;
250 const callId = msg.callId;
251 try {
252 const fnM = resolveFn(id);
253 const fn = fnM instanceof Promise ? await fnM : fnM;
254 const resolvedArgs = new Array(args.length);
255 for (let i = 0; i < args.length; i++) {
256 const a = args[i];
257 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a);
258 }
259 const gen = fn(...resolvedArgs) as AsyncGenerator;
260 await pipeIterable(gen, port!, { flags: flags ? pipeFlagsFromBuffer(flags) : undefined });
261 } catch (err) {
262 if (callId != null) postError(callId, err);
263 }
264}
265
266parentPort!.on('message', (msg: TaskMsg) => {
267 if (msg.port) {
268 void handleStreamTask(msg);
269 } else {
270 handleValueTask(msg);
271 }
272});