Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e2cebd539403475e2f4e79d55ca1a84a0ce3510d 147 lines 4.3 kB view raw
1import { MessageChannel, MessagePort } from 'node:worker_threads'; 2import type { Transferable } from 'node:worker_threads'; 3import { serializeArg } from './shared/reconstruct.ts'; 4import { collectTransferables } from './transfer.ts'; 5import { AsyncIterableTask } from './stream-task.ts'; 6import { runStreamOnDedicated } from './dedicated-runner.ts'; 7 8const CHANNEL = Symbol.for('moroutine.channel'); 9 10/** Options for configuring a channel. */ 11export interface ChannelOptions { 12 /** Maximum number of items buffered before signaling backpressure. Defaults to 16. */ 13 highWaterMark?: number; 14} 15 16const DEFAULT_HIGH_WATER = 16; 17 18interface Consumer { 19 port: MessagePort; 20 ready: boolean; 21} 22 23class Distributor<T> { 24 private consumers: Consumer[] = []; 25 private sourceIterator: AsyncIterator<T>; 26 private done = false; 27 private error: Error | null = null; 28 private pulling = false; 29 30 constructor(source: AsyncIterable<T>) { 31 this.sourceIterator = source[Symbol.asyncIterator](); 32 } 33 34 addConsumer(): MessagePort { 35 const { port1, port2 } = new MessageChannel(); 36 port1.unref(); 37 38 const consumer: Consumer = { port: port1, ready: true }; 39 40 port1.on('message', (signal: string) => { 41 if (signal === 'pause') { 42 consumer.ready = false; 43 } else if (signal === 'resume') { 44 consumer.ready = true; 45 this.tryPull(); 46 } 47 }); 48 49 port1.on('close', () => { 50 this.consumers = this.consumers.filter((c) => c !== consumer); 51 }); 52 53 this.consumers.push(consumer); 54 55 if (this.done) { 56 if (this.error) { 57 port1.postMessage({ done: true, error: this.error.message }); 58 } else { 59 port1.postMessage({ done: true }); 60 } 61 try { 62 port1.close(); 63 } catch {} 64 } else { 65 this.tryPull(); 66 } 67 68 return port2; 69 } 70 71 private async tryPull(): Promise<void> { 72 if (this.pulling || this.done) return; 73 this.pulling = true; 74 75 try { 76 while (!this.done) { 77 const readyConsumer = this.consumers.find((c) => c.ready); 78 if (!readyConsumer) break; // no ready consumers, pause pulling 79 80 const result = await this.sourceIterator.next(); 81 if (result.done) { 82 this.done = true; 83 for (const c of this.consumers) { 84 c.port.postMessage({ done: true }); 85 try { 86 c.port.close(); 87 } catch {} 88 } 89 break; 90 } 91 92 const serialized = serializeArg(result.value); 93 const transferList: Transferable[] = []; 94 collectTransferables(result.value, transferList); 95 readyConsumer.port.postMessage({ value: serialized, done: false }, transferList as any[]); 96 97 // After sending, yield to event loop so pause signals can arrive 98 await new Promise((r) => setImmediate(r)); 99 } 100 } catch (err) { 101 this.done = true; 102 this.error = err instanceof Error ? err : new Error(String(err)); 103 for (const c of this.consumers) { 104 c.port.postMessage({ done: true, error: this.error.message }); 105 try { 106 c.port.close(); 107 } catch {} 108 } 109 } 110 111 this.pulling = false; 112 } 113} 114 115export class Channel<T> { 116 private distributor: Distributor<T> | null = null; 117 private source: AsyncIterable<T>; 118 readonly [CHANNEL] = true; 119 120 constructor(source: AsyncIterable<T>) { 121 this.source = source; 122 } 123 124 addConsumer(): MessagePort { 125 if (!this.distributor) { 126 let source: AsyncIterable<T> = this.source; 127 if (source instanceof AsyncIterableTask) { 128 source = runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>; 129 } 130 this.distributor = new Distributor<T>(source); 131 } 132 return this.distributor.addConsumer(); 133 } 134} 135 136/** 137 * Creates a channel for streaming values to workers. Supports fan-out when 138 * the same channel is passed to multiple tasks. 139 * @param iterable - The async iterable or StreamTask to distribute. 140 * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). 141 * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use. 142 */ 143export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> { 144 return new Channel(iterable) as unknown as AsyncIterable<T>; 145} 146 147export { CHANNEL };