Offload functions to worker threads with shared memory primitives for Node.js.
1import { parentPort, MessagePort } from 'node:worker_threads';
2import type { Transferable } from 'node:worker_threads';
3import { registry } from './registry.ts';
4import { deserializeArg, serializeArg } from './shared/reconstruct.ts';
5import { collectTransferables } from './transfer.ts';
6import { pipeIterable, CANCEL, DEFAULT_HIGH_WATER, deserializeStreamHandle, isSerializedStreamHandle } from './pipe.ts';
7import type { StreamHandle, SerializedStreamHandle } from './pipe.ts';
8
9const imported = new Set<string>();
10const taskCache = new Map<number, unknown>();
11
12type Fn = (...args: unknown[]) => unknown;
13const fnCache = new Map<string, Fn>();
14
15function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } {
16 return typeof arg === 'object' && arg !== null && '__task__' in arg;
17}
18
19function needsAsyncResolve(arg: unknown): boolean {
20 return arg instanceof MessagePort || isSerializedStreamHandle(arg) || isTaskArg(arg);
21}
22
23// Returns the function synchronously when cached — callers must not await
24// unconditionally, or they pay a microtask hop for every dispatch.
25function resolveFn(id: string): Fn | Promise<Fn> {
26 const cached = fnCache.get(id);
27 if (cached) return cached;
28 return resolveFnSlow(id);
29}
30
31async function resolveFnSlow(id: string): Promise<Fn> {
32 const url = id.slice(0, id.lastIndexOf('#'));
33 if (!imported.has(url)) {
34 await import(url);
35 imported.add(url);
36 }
37 const fn = registry.get(id) as Fn | undefined;
38 if (!fn) throw new Error(`Moroutine not found: ${id}`);
39 fnCache.set(id, fn);
40 return fn;
41}
42
43function portToAsyncIterable<T>(handle: StreamHandle): AsyncIterable<T> {
44 const { port, highWater, readySignal } = handle;
45 const inflight = handle.flags.fields.inflight;
46 const state = handle.flags.fields.state;
47
48 const queue: T[] = [];
49 let done = false;
50 let error: Error | null = null;
51 let waiting: (() => void) | null = null;
52
53 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => {
54 if (msg.error) {
55 error = msg.error;
56 done = true;
57 if (waiting) {
58 waiting();
59 waiting = null;
60 }
61 return;
62 }
63 if (msg.done) {
64 done = true;
65 if (waiting) {
66 waiting();
67 waiting = null;
68 }
69 return;
70 }
71 queue.push(deserializeArg(msg.value) as T);
72 if (waiting) {
73 waiting();
74 waiting = null;
75 }
76 });
77
78 return {
79 [Symbol.asyncIterator]() {
80 return {
81 async next(): Promise<IteratorResult<T>> {
82 while (true) {
83 if (queue.length > 0) {
84 const value = queue.shift()!;
85 // Atomics-based backpressure: decrement inflight. In stream
86 // mode the producer parks on inflight, so we notify it; in
87 // channel mode the producer parks on readySignal, and we
88 // only wake it on the cap→below-cap transition.
89 const prev = inflight.sub(1);
90 if (readySignal) {
91 if (prev === highWater) {
92 readySignal.add(1);
93 readySignal.notify();
94 }
95 } else {
96 inflight.notify();
97 }
98 return { done: false, value };
99 }
100 if (error) throw error;
101 if (done) return { done: true, value: undefined };
102 await new Promise<void>((resolve) => {
103 waiting = resolve;
104 });
105 }
106 },
107 async return(): Promise<IteratorResult<T>> {
108 state.store(CANCEL);
109 inflight.notify();
110 if (readySignal) {
111 readySignal.add(1);
112 readySignal.notify();
113 }
114 port.close();
115 return { done: true, value: undefined };
116 },
117 };
118 },
119 };
120}
121
122async function resolveArg(arg: unknown): Promise<unknown> {
123 if (isSerializedStreamHandle(arg)) {
124 return portToAsyncIterable(deserializeStreamHandle(arg));
125 }
126 if (isTaskArg(arg)) {
127 if (taskCache.has(arg.__task__)) {
128 return taskCache.get(arg.__task__);
129 }
130 const inputArgs = arg.args;
131 const resolvedArgs = new Array(inputArgs.length);
132 for (let i = 0; i < inputArgs.length; i++) {
133 const a = inputArgs[i];
134 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a);
135 }
136 const fnM = resolveFn(arg.id);
137 const fn = fnM instanceof Promise ? await fnM : fnM;
138 const raw = fn(...resolvedArgs);
139 const value = raw instanceof Promise ? await raw : raw;
140 taskCache.set(arg.__task__, value);
141 return value;
142 }
143 return deserializeArg(arg);
144}
145
146type TaskMsg = {
147 callId?: number;
148 id: string;
149 args: unknown[];
150 /** Present on streaming dispatches; bundles port + flags + highWater. */
151 stream?: SerializedStreamHandle;
152};
153
154function postResult(callId: number, value: unknown): void {
155 const returnValue = serializeArg(value);
156 const transferList: Transferable[] = [];
157 collectTransferables(value, transferList);
158 parentPort!.postMessage({ callId, value: returnValue }, transferList);
159}
160
161function postError(callId: number, err: unknown): void {
162 parentPort!.postMessage({
163 callId,
164 error: err instanceof Error ? err : new Error(String(err)),
165 });
166}
167
168// Value-task path, structured to pay zero microtask hops when the whole
169// chain (fn resolution, arg resolution, function invocation) is sync.
170function handleValueTask(msg: TaskMsg): void {
171 const callId = msg.callId!;
172 try {
173 const fnM = resolveFn(msg.id);
174 if (fnM instanceof Promise) {
175 fnM.then(
176 (fn) => invokeWithArgs(callId, fn, msg.args),
177 (err) => postError(callId, err),
178 );
179 return;
180 }
181 invokeWithArgs(callId, fnM, msg.args);
182 } catch (err) {
183 postError(callId, err);
184 }
185}
186
187function invokeWithArgs(callId: number, fn: Fn, args: unknown[]): void {
188 try {
189 // Sync prologue: walk args in-place until we hit one that needs async
190 // resolution. If we finish the loop synchronously, go straight to invoke.
191 const resolved = new Array(args.length);
192 let i = 0;
193 for (; i < args.length; i++) {
194 const arg = args[i];
195 if (needsAsyncResolve(arg)) break;
196 resolved[i] = deserializeArg(arg);
197 }
198 if (i === args.length) {
199 invokeAndRespond(callId, fn, resolved);
200 return;
201 }
202 // Async tail: same loop body, awaiting where needed.
203 (async () => {
204 for (; i < args.length; i++) {
205 const arg = args[i];
206 resolved[i] = needsAsyncResolve(arg) ? await resolveArg(arg) : deserializeArg(arg);
207 }
208 invokeAndRespond(callId, fn, resolved);
209 })().catch((err) => postError(callId, err));
210 } catch (err) {
211 postError(callId, err);
212 }
213}
214
215function invokeAndRespond(callId: number, fn: Fn, args: unknown[]): void {
216 try {
217 const raw = fn(...args);
218 if (raw instanceof Promise) {
219 raw.then(
220 (v) => postResult(callId, v),
221 (err) => postError(callId, err),
222 );
223 return;
224 }
225 postResult(callId, raw);
226 } catch (err) {
227 postError(callId, err);
228 }
229}
230
231async function handleStreamTask(msg: TaskMsg): Promise<void> {
232 const { id, args } = msg;
233 const callId = msg.callId;
234 try {
235 const fnM = resolveFn(id);
236 const fn = fnM instanceof Promise ? await fnM : fnM;
237 const resolvedArgs = new Array(args.length);
238 for (let i = 0; i < args.length; i++) {
239 const a = args[i];
240 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a);
241 }
242 const gen = fn(...resolvedArgs) as AsyncGenerator;
243 await pipeIterable(gen, deserializeStreamHandle(msg.stream!));
244 } catch (err) {
245 if (callId != null) postError(callId, err);
246 }
247}
248
249parentPort!.on('message', (msg: TaskMsg) => {
250 if (msg.stream) {
251 void handleStreamTask(msg);
252 } else {
253 handleValueTask(msg);
254 }
255});