Offload functions to worker threads with shared memory primitives for Node.js.
1import type { Worker } from 'node:worker_threads';
2import { MessageChannel } from 'node:worker_threads';
3import type { MessagePort, Transferable } from 'node:worker_threads';
4import { transferableAbortSignal } from 'node:util';
5import { freezeModule } from './registry.ts';
6import { serializeArg, deserializeArg } from './shared/reconstruct.ts';
7import { extractTransferables, collectTransferables } from './transfer.ts';
8import { PromiseLikeTask } from './task.ts';
9import { AsyncIterableTask } from './stream-task.ts';
10import { runStreamOnDedicated } from './dedicated-runner.ts';
11import { Channel } from './channel.ts';
12import { pipeIterable, newPipeFlags, CANCEL, DEFAULT_HIGH_WATER, serializeStreamHandle } from './pipe.ts';
13import type { StreamHandle, SerializedStreamHandle } from './pipe.ts';
14import type { ChannelOptions } from './channel.ts';
15
16let nextCallId = 0;
17const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>();
18const streamPortStack: MessagePort[][] = [];
19
20export function setupWorker(worker: Worker): void {
21 worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => {
22 const call = pending.get(msg.callId);
23 if (!call) return;
24 pending.delete(msg.callId);
25 if (msg.error !== undefined) {
26 call.reject(msg.error);
27 } else {
28 call.resolve(deserializeArg(msg.value));
29 }
30 });
31}
32
33function isAsyncGenerator(arg: unknown): boolean {
34 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator';
35}
36
37/**
38 * Pipes an AsyncIterable from the parent thread to a worker-facing MessagePort.
39 * Creates a producer-side pipeIterable loop with atomics backpressure and
40 * returns a serialized consumer-side handle for the worker.
41 */
42function pipeArgToWorker(iterable: AsyncIterable<unknown>): SerializedStreamHandle {
43 const { port1, port2 } = new MessageChannel();
44 port1.unref();
45 const flags = newPipeFlags();
46 const producerHandle: StreamHandle = { port: port1, flags, highWater: DEFAULT_HIGH_WATER };
47 void pipeIterable(iterable, producerHandle, { extractTransfers: true });
48 streamPortStack[streamPortStack.length - 1].push(port2);
49 return serializeStreamHandle({ port: port2, flags, highWater: DEFAULT_HIGH_WATER });
50}
51
52function prepareArg(arg: unknown): unknown {
53 // Auto-detect AbortSignal args — mark transferable and include in transfer list
54 if (arg instanceof AbortSignal) {
55 const signal = transferableAbortSignal(arg);
56 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort);
57 return signal;
58 }
59 // Auto-detect AsyncGenerator args — pipe via MessageChannel + atomics backpressure
60 if (isAsyncGenerator(arg)) {
61 return pipeArgToWorker(arg as AsyncIterable<unknown>);
62 }
63 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output
64 if (arg instanceof AsyncIterableTask) {
65 return pipeArgToWorker(runStreamOnDedicated(arg.id, arg.args) as AsyncIterable<unknown>);
66 }
67 // Channel wrapper — single distributor loop + per-consumer atomics backpressure.
68 if (arg instanceof Channel) {
69 const handle = arg.addConsumer();
70 streamPortStack[streamPortStack.length - 1].push(handle.port);
71 return serializeStreamHandle(handle);
72 }
73 if (arg instanceof PromiseLikeTask) {
74 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) };
75 }
76 return serializeArg(arg);
77}
78
79export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> {
80 const url = id.slice(0, id.lastIndexOf('#'));
81 freezeModule(url);
82 const callId = nextCallId++;
83 return new Promise<T>((resolve, reject) => {
84 pending.set(callId, { resolve, reject });
85 const extracted = extractTransferables(args);
86 streamPortStack.push([]);
87 const preparedArgs = extracted.args.map(prepareArg);
88 const ports = streamPortStack.pop()!;
89 const msg = { callId, id, args: preparedArgs };
90 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]);
91 });
92}
93
94export interface StreamDispatch<T> {
95 iterable: AsyncIterable<T>;
96 done: Promise<void>;
97}
98
99export function dispatchStream<T>(
100 worker: Worker,
101 id: string,
102 args: unknown[],
103 opts?: ChannelOptions,
104): StreamDispatch<T> {
105 const url = id.slice(0, id.lastIndexOf('#'));
106 freezeModule(url);
107 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER;
108
109 const { port1, port2 } = new MessageChannel();
110 // Atomics-based backpressure: shared inflight count + cancel flag.
111 // Worker parks on flags.inflight when it hits highWater; parent decrements
112 // + notifies on pull. Replaces pause/resume messages and the
113 // adaptive-yield setImmediate dance.
114 const flags = newPipeFlags();
115 const handle: StreamHandle = { port: port2, flags, highWater };
116
117 const extracted = extractTransferables(args);
118 streamPortStack.push([]);
119 const preparedArgs = extracted.args.map(prepareArg);
120 const ports = streamPortStack.pop()!;
121 const msg = { id, args: preparedArgs, stream: serializeStreamHandle(handle) };
122 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]);
123
124 let resolveDone: () => void;
125 const donePromise = new Promise<void>((r) => {
126 resolveDone = r;
127 });
128
129 const queue: T[] = [];
130 let done = false;
131 let error: Error | null = null;
132 let waiting: (() => void) | null = null;
133
134 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => {
135 if (msg.error) {
136 error = msg.error;
137 done = true;
138 port1.close();
139 resolveDone!();
140 if (waiting) {
141 waiting();
142 waiting = null;
143 }
144 return;
145 }
146 if (msg.done) {
147 done = true;
148 port1.close();
149 resolveDone!();
150 if (waiting) {
151 waiting();
152 waiting = null;
153 }
154 return;
155 }
156 queue.push(deserializeArg(msg.value) as T);
157 if (waiting) {
158 waiting();
159 waiting = null;
160 }
161 });
162 port1.unref();
163
164 return {
165 iterable: {
166 [Symbol.asyncIterator]() {
167 return {
168 async next(): Promise<IteratorResult<T>> {
169 while (true) {
170 if (queue.length > 0) {
171 const value = queue.shift()!;
172 // Signal consumption to producer
173 flags.fields.inflight.sub(1);
174 flags.fields.inflight.notify();
175 return { done: false, value };
176 }
177 if (error) throw error;
178 if (done) return { done: true, value: undefined };
179 await new Promise<void>((resolve) => {
180 waiting = resolve;
181 });
182 }
183 },
184 async return(): Promise<IteratorResult<T>> {
185 flags.fields.state.store(CANCEL);
186 flags.fields.inflight.notify();
187 port1.close();
188 resolveDone!();
189 return { done: true, value: undefined };
190 },
191 };
192 },
193 },
194 done: donePromise,
195 };
196}