import type { Worker } from 'node:worker_threads'; import { MessageChannel } from 'node:worker_threads'; import type { MessagePort, Transferable } from 'node:worker_threads'; import { transferableAbortSignal } from 'node:util'; import { freezeModule } from './registry.ts'; import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; import { extractTransferables, collectTransferables } from './transfer.ts'; import { PromiseLikeTask } from './task.ts'; import { AsyncIterableTask } from './stream-task.ts'; import { runStreamOnDedicated } from './dedicated-runner.ts'; import { Channel } from './channel.ts'; import { pipeIterable, newPipeFlags, CANCEL, DEFAULT_HIGH_WATER, serializeStreamHandle } from './pipe.ts'; import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; import type { ChannelOptions } from './channel.ts'; let nextCallId = 0; const pending = new Map void; reject: (reason: any) => void }>(); const streamPortStack: MessagePort[][] = []; export function setupWorker(worker: Worker): void { worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => { const call = pending.get(msg.callId); if (!call) return; pending.delete(msg.callId); if (msg.error !== undefined) { call.reject(msg.error); } else { call.resolve(deserializeArg(msg.value)); } }); } function isAsyncGenerator(arg: unknown): boolean { return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; } /** * Pipes an AsyncIterable from the parent thread to a worker-facing MessagePort. * Creates a producer-side pipeIterable loop with atomics backpressure and * returns a serialized consumer-side handle for the worker. */ function pipeArgToWorker(iterable: AsyncIterable): SerializedStreamHandle { const { port1, port2 } = new MessageChannel(); port1.unref(); const flags = newPipeFlags(); const producerHandle: StreamHandle = { port: port1, flags, highWater: DEFAULT_HIGH_WATER }; void pipeIterable(iterable, producerHandle, { extractTransfers: true }); streamPortStack[streamPortStack.length - 1].push(port2); return serializeStreamHandle({ port: port2, flags, highWater: DEFAULT_HIGH_WATER }); } function prepareArg(arg: unknown): unknown { // Auto-detect AbortSignal args — mark transferable and include in transfer list if (arg instanceof AbortSignal) { const signal = transferableAbortSignal(arg); streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); return signal; } // Auto-detect AsyncGenerator args — pipe via MessageChannel + atomics backpressure if (isAsyncGenerator(arg)) { return pipeArgToWorker(arg as AsyncIterable); } // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output if (arg instanceof AsyncIterableTask) { return pipeArgToWorker(runStreamOnDedicated(arg.id, arg.args) as AsyncIterable); } // Channel wrapper — single distributor loop + per-consumer atomics backpressure. if (arg instanceof Channel) { const handle = arg.addConsumer(); streamPortStack[streamPortStack.length - 1].push(handle.port); return serializeStreamHandle(handle); } if (arg instanceof PromiseLikeTask) { return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; } return serializeArg(arg); } export function execute(worker: Worker, id: string, args: unknown[]): Promise { const url = id.slice(0, id.lastIndexOf('#')); freezeModule(url); const callId = nextCallId++; return new Promise((resolve, reject) => { pending.set(callId, { resolve, reject }); const extracted = extractTransferables(args); streamPortStack.push([]); const preparedArgs = extracted.args.map(prepareArg); const ports = streamPortStack.pop()!; const msg = { callId, id, args: preparedArgs }; worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); }); } export interface StreamDispatch { iterable: AsyncIterable; done: Promise; } export function dispatchStream( worker: Worker, id: string, args: unknown[], opts?: ChannelOptions, ): StreamDispatch { const url = id.slice(0, id.lastIndexOf('#')); freezeModule(url); const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; const { port1, port2 } = new MessageChannel(); // Atomics-based backpressure: shared inflight count + cancel flag. // Worker parks on flags.inflight when it hits highWater; parent decrements // + notifies on pull. Replaces pause/resume messages and the // adaptive-yield setImmediate dance. const flags = newPipeFlags(); const handle: StreamHandle = { port: port2, flags, highWater }; const extracted = extractTransferables(args); streamPortStack.push([]); const preparedArgs = extracted.args.map(prepareArg); const ports = streamPortStack.pop()!; const msg = { id, args: preparedArgs, stream: serializeStreamHandle(handle) }; worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); let resolveDone: () => void; const donePromise = new Promise((r) => { resolveDone = r; }); const queue: T[] = []; let done = false; let error: Error | null = null; let waiting: (() => void) | null = null; port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { if (msg.error) { error = msg.error; done = true; port1.close(); resolveDone!(); if (waiting) { waiting(); waiting = null; } return; } if (msg.done) { done = true; port1.close(); resolveDone!(); if (waiting) { waiting(); waiting = null; } return; } queue.push(deserializeArg(msg.value) as T); if (waiting) { waiting(); waiting = null; } }); port1.unref(); return { iterable: { [Symbol.asyncIterator]() { return { async next(): Promise> { while (true) { if (queue.length > 0) { const value = queue.shift()!; // Signal consumption to producer flags.fields.inflight.sub(1); flags.fields.inflight.notify(); return { done: false, value }; } if (error) throw error; if (done) return { done: true, value: undefined }; await new Promise((resolve) => { waiting = resolve; }); } }, async return(): Promise> { flags.fields.state.store(CANCEL); flags.fields.inflight.notify(); port1.close(); resolveDone!(); return { done: true, value: undefined }; }, }; }, }, done: donePromise, }; }