Offload functions to worker threads with shared memory primitives for Node.js.
1import { parentPort, MessagePort } from 'node:worker_threads';
2import type { Transferable } from 'node:worker_threads';
3import { registry } from './registry.ts';
4import { deserializeArg, serializeArg } from './shared/reconstruct.ts';
5import { collectTransferables } from './transfer.ts';
6
7const imported = new Set<string>();
8const taskCache = new Map<number, unknown>();
9
10function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } {
11 return typeof arg === 'object' && arg !== null && '__task__' in arg;
12}
13
14function portToAsyncIterable<T>(port: MessagePort): AsyncIterable<T> {
15 const queue: T[] = [];
16 let done = false;
17 let error: Error | null = null;
18 let paused = false;
19 let waiting: (() => void) | null = null;
20
21 const HIGH_WATER = 16;
22
23 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => {
24 if (msg.error) {
25 error = msg.error;
26 done = true;
27 if (waiting) {
28 waiting();
29 waiting = null;
30 }
31 return;
32 }
33 if (msg.done) {
34 done = true;
35 if (waiting) {
36 waiting();
37 waiting = null;
38 }
39 return;
40 }
41 queue.push(deserializeArg(msg.value) as T);
42 if (waiting) {
43 waiting();
44 waiting = null;
45 }
46 if (!paused && queue.length >= HIGH_WATER) {
47 paused = true;
48 port.postMessage('pause');
49 }
50 });
51
52 return {
53 [Symbol.asyncIterator]() {
54 return {
55 async next(): Promise<IteratorResult<T>> {
56 while (true) {
57 if (queue.length > 0) {
58 const value = queue.shift()!;
59 if (paused && queue.length <= 1) {
60 paused = false;
61 port.postMessage('resume');
62 }
63 return { done: false, value };
64 }
65 if (error) throw error;
66 if (done) return { done: true, value: undefined };
67 await new Promise<void>((resolve) => {
68 waiting = resolve;
69 });
70 }
71 },
72 async return(): Promise<IteratorResult<T>> {
73 port.close();
74 return { done: true, value: undefined };
75 },
76 };
77 },
78 };
79}
80
81async function resolveArg(arg: unknown): Promise<unknown> {
82 if (arg instanceof MessagePort) {
83 return portToAsyncIterable(arg);
84 }
85 if (isTaskArg(arg)) {
86 if (taskCache.has(arg.__task__)) {
87 return taskCache.get(arg.__task__);
88 }
89 // Resolve the task's own args recursively
90 const resolvedArgs = await Promise.all(arg.args.map(resolveArg));
91 // Import the module and run the function
92 const url = arg.id.slice(0, arg.id.lastIndexOf('#'));
93 if (!imported.has(url)) {
94 await import(url);
95 imported.add(url);
96 }
97 const fn = registry.get(arg.id);
98 if (!fn) throw new Error(`Moroutine not found: ${arg.id}`);
99 const value = await fn(...resolvedArgs);
100 taskCache.set(arg.__task__, value);
101 return value;
102 }
103 return deserializeArg(arg);
104}
105
106parentPort!.on('message', async (msg: { callId?: number; id: string; args: unknown[]; port?: MessagePort }) => {
107 const { id, args, port } = msg;
108 try {
109 const url = id.slice(0, id.lastIndexOf('#'));
110 if (!imported.has(url)) {
111 await import(url);
112 imported.add(url);
113 }
114
115 const fn = registry.get(id);
116 if (!fn) throw new Error(`Moroutine not found: ${id}`);
117
118 const resolvedArgs = await Promise.all(args.map(resolveArg));
119
120 if (port) {
121 let paused = false;
122 let resumed: (() => void) | null = null;
123 let cancelled = false;
124
125 port.on('message', (signal: string) => {
126 if (signal === 'pause') {
127 paused = true;
128 } else if (signal === 'resume') {
129 paused = false;
130 if (resumed) {
131 resumed();
132 resumed = null;
133 }
134 }
135 });
136
137 port.on('close', () => {
138 cancelled = true;
139 if (resumed) {
140 resumed();
141 resumed = null;
142 }
143 });
144
145 try {
146 const gen = fn(...resolvedArgs) as AsyncGenerator;
147 for await (const value of gen) {
148 if (cancelled) break;
149 if (paused)
150 await new Promise<void>((r) => {
151 resumed = r;
152 });
153 if (cancelled) break;
154 const serialized = serializeArg(value);
155 const transferList: Transferable[] = [];
156 collectTransferables(value, transferList);
157 port.postMessage({ value: serialized, done: false }, transferList);
158 await new Promise((r) => setImmediate(r));
159 }
160 if (!cancelled) port.postMessage({ done: true });
161 } catch (err) {
162 if (!cancelled) {
163 port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) });
164 }
165 }
166 try {
167 port.close();
168 } catch {}
169 return;
170 }
171
172 const callId = msg.callId!;
173 const value = await fn(...resolvedArgs);
174 const returnValue = serializeArg(value);
175 const transferList: Transferable[] = [];
176 collectTransferables(value, transferList);
177 parentPort!.postMessage({ callId, value: returnValue }, transferList);
178 } catch (err) {
179 parentPort!.postMessage({ callId: msg.callId!, error: err instanceof Error ? err : new Error(String(err)) });
180 }
181});