Offload functions to worker threads with shared memory primitives for Node.js.
1import type { Worker } from 'node:worker_threads';
2import { MessageChannel } from 'node:worker_threads';
3import type { MessagePort, Transferable } from 'node:worker_threads';
4import { transferableAbortSignal } from 'node:util';
5import { freezeModule } from './registry.ts';
6import { serializeArg, deserializeArg } from './shared/reconstruct.ts';
7import { extractTransferables, collectTransferables } from './transfer.ts';
8import { PromiseLikeTask } from './task.ts';
9import { AsyncIterableTask } from './stream-task.ts';
10import { runStreamOnDedicated } from './dedicated-runner.ts';
11import { CHANNEL, Channel } from './channel.ts';
12import { pipeIterable, newPipeFlags, CANCEL } from './pipe.ts';
13import type { ChannelOptions } from './channel.ts';
14
15let nextCallId = 0;
16const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>();
17const streamPortStack: MessagePort[][] = [];
18
19export function setupWorker(worker: Worker): void {
20 worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => {
21 const call = pending.get(msg.callId);
22 if (!call) return;
23 pending.delete(msg.callId);
24 if (msg.error !== undefined) {
25 call.reject(msg.error);
26 } else {
27 call.resolve(deserializeArg(msg.value));
28 }
29 });
30}
31
32const DEFAULT_HIGH_WATER = 16;
33const LOW_WATER = 1;
34
35function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void {
36 void pipeIterable(iterable, port, { extractTransfers: true, yieldEvery: highWaterMark });
37}
38
39function isAsyncGenerator(arg: unknown): boolean {
40 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator';
41}
42
43function prepareArg(arg: unknown): unknown {
44 // Auto-detect AbortSignal args — mark transferable and include in transfer list
45 if (arg instanceof AbortSignal) {
46 const signal = transferableAbortSignal(arg);
47 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort);
48 return signal;
49 }
50 // Auto-detect AsyncGenerator args — pipe via MessageChannel
51 if (isAsyncGenerator(arg)) {
52 const { port1, port2 } = new MessageChannel();
53 port1.unref();
54 pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER);
55 streamPortStack[streamPortStack.length - 1].push(port2);
56 return port2;
57 }
58 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output
59 if (arg instanceof AsyncIterableTask) {
60 const iterable = runStreamOnDedicated(arg.id, arg.args);
61 const { port1, port2 } = new MessageChannel();
62 port1.unref();
63 pipeToPort(iterable, port1, DEFAULT_HIGH_WATER);
64 streamPortStack[streamPortStack.length - 1].push(port2);
65 return port2;
66 }
67 // Channel wrapper — single distributor loop + per-consumer atomics backpressure.
68 if (arg instanceof Channel) {
69 const { port, flags, readySignal } = arg.addConsumer();
70 streamPortStack[streamPortStack.length - 1].push(port);
71 return { __stream__: true, port, flags, readySignal };
72 }
73 if (arg instanceof PromiseLikeTask) {
74 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) };
75 }
76 return serializeArg(arg);
77}
78
79export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> {
80 const url = id.slice(0, id.lastIndexOf('#'));
81 freezeModule(url);
82 const callId = nextCallId++;
83 return new Promise<T>((resolve, reject) => {
84 pending.set(callId, { resolve, reject });
85 const extracted = extractTransferables(args);
86 streamPortStack.push([]);
87 const preparedArgs = extracted.args.map(prepareArg);
88 const ports = streamPortStack.pop()!;
89 const msg = { callId, id, args: preparedArgs };
90 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]);
91 });
92}
93
94export interface StreamDispatch<T> {
95 iterable: AsyncIterable<T>;
96 done: Promise<void>;
97}
98
99export function dispatchStream<T>(
100 worker: Worker,
101 id: string,
102 args: unknown[],
103 opts?: ChannelOptions,
104): StreamDispatch<T> {
105 const url = id.slice(0, id.lastIndexOf('#'));
106 freezeModule(url);
107 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER;
108
109 const { port1, port2 } = new MessageChannel();
110 // Atomics-based backpressure: shared inflight count + cancel flag.
111 // Worker parks on flags.inflight when it hits highWater; parent decrements
112 // + notifies on pull. Replaces pause/resume messages and the
113 // adaptive-yield setImmediate dance.
114 const flags = newPipeFlags();
115
116 const extracted = extractTransferables(args);
117 streamPortStack.push([]);
118 const preparedArgs = extracted.args.map(prepareArg);
119 const ports = streamPortStack.pop()!;
120 const msg = { id, args: preparedArgs, port: port2, flags: flags.buffer };
121 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]);
122
123 let resolveDone: () => void;
124 const donePromise = new Promise<void>((r) => {
125 resolveDone = r;
126 });
127
128 const queue: T[] = [];
129 let done = false;
130 let error: Error | null = null;
131 let waiting: (() => void) | null = null;
132
133 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => {
134 if (msg.error) {
135 error = msg.error;
136 done = true;
137 port1.close();
138 resolveDone!();
139 if (waiting) {
140 waiting();
141 waiting = null;
142 }
143 return;
144 }
145 if (msg.done) {
146 done = true;
147 port1.close();
148 resolveDone!();
149 if (waiting) {
150 waiting();
151 waiting = null;
152 }
153 return;
154 }
155 queue.push(deserializeArg(msg.value) as T);
156 if (waiting) {
157 waiting();
158 waiting = null;
159 }
160 });
161 port1.unref();
162
163 return {
164 iterable: {
165 [Symbol.asyncIterator]() {
166 return {
167 async next(): Promise<IteratorResult<T>> {
168 while (true) {
169 if (queue.length > 0) {
170 const value = queue.shift()!;
171 // Signal consumption to producer
172 flags.inflight.sub(1);
173 flags.inflight.notify();
174 return { done: false, value };
175 }
176 if (error) throw error;
177 if (done) return { done: true, value: undefined };
178 await new Promise<void>((resolve) => {
179 waiting = resolve;
180 });
181 }
182 },
183 async return(): Promise<IteratorResult<T>> {
184 flags.state.store(CANCEL);
185 flags.inflight.notify();
186 port1.close();
187 resolveDone!();
188 return { done: true, value: undefined };
189 },
190 };
191 },
192 },
193 done: donePromise,
194 };
195}