Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e343bf0d85181e241d3db15360bb7a080fd71c8d 195 lines 6.6 kB view raw
1import type { Worker } from 'node:worker_threads'; 2import { MessageChannel } from 'node:worker_threads'; 3import type { MessagePort, Transferable } from 'node:worker_threads'; 4import { transferableAbortSignal } from 'node:util'; 5import { freezeModule } from './registry.ts'; 6import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 7import { extractTransferables, collectTransferables } from './transfer.ts'; 8import { PromiseLikeTask } from './task.ts'; 9import { AsyncIterableTask } from './stream-task.ts'; 10import { runStreamOnDedicated } from './dedicated-runner.ts'; 11import { CHANNEL, Channel } from './channel.ts'; 12import { pipeIterable, newPipeFlags, CANCEL } from './pipe.ts'; 13import type { ChannelOptions } from './channel.ts'; 14 15let nextCallId = 0; 16const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); 17const streamPortStack: MessagePort[][] = []; 18 19export function setupWorker(worker: Worker): void { 20 worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => { 21 const call = pending.get(msg.callId); 22 if (!call) return; 23 pending.delete(msg.callId); 24 if (msg.error !== undefined) { 25 call.reject(msg.error); 26 } else { 27 call.resolve(deserializeArg(msg.value)); 28 } 29 }); 30} 31 32const DEFAULT_HIGH_WATER = 16; 33const LOW_WATER = 1; 34 35function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void { 36 void pipeIterable(iterable, port, { extractTransfers: true, yieldEvery: highWaterMark }); 37} 38 39function isAsyncGenerator(arg: unknown): boolean { 40 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 41} 42 43function prepareArg(arg: unknown): unknown { 44 // Auto-detect AbortSignal args — mark transferable and include in transfer list 45 if (arg instanceof AbortSignal) { 46 const signal = transferableAbortSignal(arg); 47 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); 48 return signal; 49 } 50 // Auto-detect AsyncGenerator args — pipe via MessageChannel 51 if (isAsyncGenerator(arg)) { 52 const { port1, port2 } = new MessageChannel(); 53 port1.unref(); 54 pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER); 55 streamPortStack[streamPortStack.length - 1].push(port2); 56 return port2; 57 } 58 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output 59 if (arg instanceof AsyncIterableTask) { 60 const iterable = runStreamOnDedicated(arg.id, arg.args); 61 const { port1, port2 } = new MessageChannel(); 62 port1.unref(); 63 pipeToPort(iterable, port1, DEFAULT_HIGH_WATER); 64 streamPortStack[streamPortStack.length - 1].push(port2); 65 return port2; 66 } 67 // Channel wrapper — single distributor loop + per-consumer atomics backpressure. 68 if (arg instanceof Channel) { 69 const { port, flags, readySignal } = arg.addConsumer(); 70 streamPortStack[streamPortStack.length - 1].push(port); 71 return { __stream__: true, port, flags, readySignal }; 72 } 73 if (arg instanceof PromiseLikeTask) { 74 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; 75 } 76 return serializeArg(arg); 77} 78 79export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> { 80 const url = id.slice(0, id.lastIndexOf('#')); 81 freezeModule(url); 82 const callId = nextCallId++; 83 return new Promise<T>((resolve, reject) => { 84 pending.set(callId, { resolve, reject }); 85 const extracted = extractTransferables(args); 86 streamPortStack.push([]); 87 const preparedArgs = extracted.args.map(prepareArg); 88 const ports = streamPortStack.pop()!; 89 const msg = { callId, id, args: preparedArgs }; 90 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); 91 }); 92} 93 94export interface StreamDispatch<T> { 95 iterable: AsyncIterable<T>; 96 done: Promise<void>; 97} 98 99export function dispatchStream<T>( 100 worker: Worker, 101 id: string, 102 args: unknown[], 103 opts?: ChannelOptions, 104): StreamDispatch<T> { 105 const url = id.slice(0, id.lastIndexOf('#')); 106 freezeModule(url); 107 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 108 109 const { port1, port2 } = new MessageChannel(); 110 // Atomics-based backpressure: shared inflight count + cancel flag. 111 // Worker parks on flags.inflight when it hits highWater; parent decrements 112 // + notifies on pull. Replaces pause/resume messages and the 113 // adaptive-yield setImmediate dance. 114 const flags = newPipeFlags(); 115 116 const extracted = extractTransferables(args); 117 streamPortStack.push([]); 118 const preparedArgs = extracted.args.map(prepareArg); 119 const ports = streamPortStack.pop()!; 120 const msg = { id, args: preparedArgs, port: port2, flags: flags.buffer }; 121 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 122 123 let resolveDone: () => void; 124 const donePromise = new Promise<void>((r) => { 125 resolveDone = r; 126 }); 127 128 const queue: T[] = []; 129 let done = false; 130 let error: Error | null = null; 131 let waiting: (() => void) | null = null; 132 133 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 134 if (msg.error) { 135 error = msg.error; 136 done = true; 137 port1.close(); 138 resolveDone!(); 139 if (waiting) { 140 waiting(); 141 waiting = null; 142 } 143 return; 144 } 145 if (msg.done) { 146 done = true; 147 port1.close(); 148 resolveDone!(); 149 if (waiting) { 150 waiting(); 151 waiting = null; 152 } 153 return; 154 } 155 queue.push(deserializeArg(msg.value) as T); 156 if (waiting) { 157 waiting(); 158 waiting = null; 159 } 160 }); 161 port1.unref(); 162 163 return { 164 iterable: { 165 [Symbol.asyncIterator]() { 166 return { 167 async next(): Promise<IteratorResult<T>> { 168 while (true) { 169 if (queue.length > 0) { 170 const value = queue.shift()!; 171 // Signal consumption to producer 172 flags.inflight.sub(1); 173 flags.inflight.notify(); 174 return { done: false, value }; 175 } 176 if (error) throw error; 177 if (done) return { done: true, value: undefined }; 178 await new Promise<void>((resolve) => { 179 waiting = resolve; 180 }); 181 } 182 }, 183 async return(): Promise<IteratorResult<T>> { 184 flags.state.store(CANCEL); 185 flags.inflight.notify(); 186 port1.close(); 187 resolveDone!(); 188 return { done: true, value: undefined }; 189 }, 190 }; 191 }, 192 }, 193 done: donePromise, 194 }; 195}