import type { Worker } from 'node:worker_threads'; import { MessageChannel } from 'node:worker_threads'; import type { MessagePort, Transferable } from 'node:worker_threads'; import { transferableAbortSignal } from 'node:util'; import { freezeModule } from './registry.ts'; import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; import { extractTransferables, collectTransferables } from './transfer.ts'; import { PromiseLikeTask } from './task.ts'; import { AsyncIterableTask } from './stream-task.ts'; import { runStreamOnDedicated } from './dedicated-runner.ts'; import { CHANNEL, Channel } from './channel.ts'; import type { ChannelOptions } from './channel.ts'; let nextCallId = 0; const pending = new Map void; reject: (reason: any) => void }>(); const streamPortStack: MessagePort[][] = []; export function setupWorker(worker: Worker): void { worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => { const call = pending.get(msg.callId); if (!call) return; pending.delete(msg.callId); if (msg.error !== undefined) { call.reject(msg.error); } else { call.resolve(deserializeArg(msg.value)); } }); } const DEFAULT_HIGH_WATER = 16; const LOW_WATER = 1; function pipeToPort(iterable: AsyncIterable, port: MessagePort, highWaterMark: number): void { let paused = false; let resumed: (() => void) | null = null; let cancelled = false; port.on('message', (signal: string) => { if (signal === 'pause') { paused = true; } else if (signal === 'resume') { paused = false; if (resumed) { resumed(); resumed = null; } } }); port.on('close', () => { cancelled = true; if (resumed) { resumed(); resumed = null; } }); (async () => { try { for await (const value of iterable) { if (cancelled) break; if (paused) await new Promise((r) => { resumed = r; }); if (cancelled) break; const extracted = extractTransferables([value]); const serialized = serializeArg(extracted.args[0]); const transferList: Transferable[] = [...extracted.transfer]; collectTransferables(extracted.args[0], transferList); port.postMessage({ value: serialized, done: false }, transferList as any[]); } if (!cancelled) port.postMessage({ done: true }); } catch (err) { if (!cancelled) { port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); } } try { port.close(); } catch {} })(); } function isAsyncGenerator(arg: unknown): boolean { return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; } function prepareArg(arg: unknown): unknown { // Auto-detect AbortSignal args — mark transferable and include in transfer list if (arg instanceof AbortSignal) { const signal = transferableAbortSignal(arg); streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); return signal; } // Auto-detect AsyncGenerator args — pipe via MessageChannel if (isAsyncGenerator(arg)) { const { port1, port2 } = new MessageChannel(); port1.unref(); pipeToPort(arg as AsyncIterable, port1, DEFAULT_HIGH_WATER); streamPortStack[streamPortStack.length - 1].push(port2); return port2; } // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output if (arg instanceof AsyncIterableTask) { const iterable = runStreamOnDedicated(arg.id, arg.args); const { port1, port2 } = new MessageChannel(); port1.unref(); pipeToPort(iterable, port1, DEFAULT_HIGH_WATER); streamPortStack[streamPortStack.length - 1].push(port2); return port2; } // Channel wrapper — supports fan-out via shared distributor if (arg instanceof Channel) { const port2 = arg.addConsumer(); streamPortStack[streamPortStack.length - 1].push(port2); return port2; } if (arg instanceof PromiseLikeTask) { return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; } return serializeArg(arg); } export function execute(worker: Worker, id: string, args: unknown[]): Promise { const url = id.slice(0, id.lastIndexOf('#')); freezeModule(url); const callId = nextCallId++; return new Promise((resolve, reject) => { pending.set(callId, { resolve, reject }); const extracted = extractTransferables(args); streamPortStack.push([]); const preparedArgs = extracted.args.map(prepareArg); const ports = streamPortStack.pop()!; const msg = { callId, id, args: preparedArgs }; worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); }); } export interface StreamDispatch { iterable: AsyncIterable; done: Promise; } export function dispatchStream( worker: Worker, id: string, args: unknown[], opts?: ChannelOptions, ): StreamDispatch { const url = id.slice(0, id.lastIndexOf('#')); freezeModule(url); const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; const { port1, port2 } = new MessageChannel(); const extracted = extractTransferables(args); streamPortStack.push([]); const preparedArgs = extracted.args.map(prepareArg); const ports = streamPortStack.pop()!; const msg = { id, args: preparedArgs, port: port2 }; worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); let resolveDone: () => void; const donePromise = new Promise((r) => { resolveDone = r; }); const queue: T[] = []; let done = false; let error: Error | null = null; let paused = false; let waiting: (() => void) | null = null; port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { if (msg.error) { error = msg.error; done = true; port1.close(); resolveDone!(); if (waiting) { waiting(); waiting = null; } return; } if (msg.done) { done = true; port1.close(); resolveDone!(); if (waiting) { waiting(); waiting = null; } return; } queue.push(deserializeArg(msg.value) as T); if (waiting) { waiting(); waiting = null; } if (!paused && queue.length >= highWater) { paused = true; port1.postMessage('pause'); } }); port1.unref(); return { iterable: { [Symbol.asyncIterator]() { return { async next(): Promise> { while (true) { if (queue.length > 0) { const value = queue.shift()!; if (paused && queue.length <= LOW_WATER) { paused = false; port1.postMessage('resume'); } return { done: false, value }; } if (error) throw error; if (done) return { done: true, value: undefined }; await new Promise((resolve) => { waiting = resolve; }); } }, async return(): Promise> { port1.close(); resolveDone!(); return { done: true, value: undefined }; }, }; }, }, done: donePromise, }; }