import { MessageChannel, MessagePort } from 'node:worker_threads'; import type { Transferable } from 'node:worker_threads'; import { serializeArg } from './shared/reconstruct.ts'; import { collectTransferables } from './transfer.ts'; import { AsyncIterableTask } from './stream-task.ts'; import { runStreamOnDedicated } from './dedicated-runner.ts'; const CHANNEL = Symbol.for('moroutine.channel'); /** Options for configuring a channel. */ export interface ChannelOptions { /** Maximum number of items buffered before signaling backpressure. Defaults to 16. */ highWaterMark?: number; } const DEFAULT_HIGH_WATER = 16; interface Consumer { port: MessagePort; ready: boolean; } class Distributor { private consumers: Consumer[] = []; private sourceIterator: AsyncIterator; private done = false; private error: Error | null = null; private pulling = false; constructor(source: AsyncIterable) { this.sourceIterator = source[Symbol.asyncIterator](); } addConsumer(): MessagePort { const { port1, port2 } = new MessageChannel(); port1.unref(); const consumer: Consumer = { port: port1, ready: true }; port1.on('message', (signal: string) => { if (signal === 'pause') { consumer.ready = false; } else if (signal === 'resume') { consumer.ready = true; this.tryPull(); } }); port1.on('close', () => { this.consumers = this.consumers.filter((c) => c !== consumer); }); this.consumers.push(consumer); if (this.done) { if (this.error) { port1.postMessage({ done: true, error: this.error.message }); } else { port1.postMessage({ done: true }); } try { port1.close(); } catch {} } else { this.tryPull(); } return port2; } private async tryPull(): Promise { if (this.pulling || this.done) return; this.pulling = true; try { while (!this.done) { const readyConsumer = this.consumers.find((c) => c.ready); if (!readyConsumer) break; // no ready consumers, pause pulling const result = await this.sourceIterator.next(); if (result.done) { this.done = true; for (const c of this.consumers) { c.port.postMessage({ done: true }); try { c.port.close(); } catch {} } break; } const serialized = serializeArg(result.value); const transferList: Transferable[] = []; collectTransferables(result.value, transferList); readyConsumer.port.postMessage({ value: serialized, done: false }, transferList as any[]); // After sending, yield to event loop so pause signals can arrive await new Promise((r) => setImmediate(r)); } } catch (err) { this.done = true; this.error = err instanceof Error ? err : new Error(String(err)); for (const c of this.consumers) { c.port.postMessage({ done: true, error: this.error.message }); try { c.port.close(); } catch {} } } this.pulling = false; } } export class Channel { private distributor: Distributor | null = null; private source: AsyncIterable; readonly [CHANNEL] = true; constructor(source: AsyncIterable) { this.source = source; } addConsumer(): MessagePort { if (!this.distributor) { let source: AsyncIterable = this.source; if (source instanceof AsyncIterableTask) { source = runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable; } this.distributor = new Distributor(source); } return this.distributor.addConsumer(); } } /** * Creates a channel for streaming values to workers. Supports fan-out when * the same channel is passed to multiple tasks. * @param iterable - The async iterable or StreamTask to distribute. * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). * @returns A Channel, typed as AsyncIterable for transparent moroutine arg use. */ export function channel(iterable: AsyncIterable, opts?: ChannelOptions): AsyncIterable { return new Channel(iterable) as unknown as AsyncIterable; } export { CHANNEL };