Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 24741da9f7ae74fbc28ff1941e4276ba74e6df3c 196 lines 6.9 kB view raw
1import type { Worker } from 'node:worker_threads'; 2import { MessageChannel } from 'node:worker_threads'; 3import type { MessagePort, Transferable } from 'node:worker_threads'; 4import { transferableAbortSignal } from 'node:util'; 5import { freezeModule } from './registry.ts'; 6import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 7import { extractTransferables, collectTransferables } from './transfer.ts'; 8import { PromiseLikeTask } from './task.ts'; 9import { AsyncIterableTask } from './stream-task.ts'; 10import { runStreamOnDedicated } from './dedicated-runner.ts'; 11import { Channel } from './channel.ts'; 12import { pipeIterable, newPipeFlags, CANCEL, DEFAULT_HIGH_WATER, serializeStreamHandle } from './pipe.ts'; 13import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; 14import type { ChannelOptions } from './channel.ts'; 15 16let nextCallId = 0; 17const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); 18const streamPortStack: MessagePort[][] = []; 19 20export function setupWorker(worker: Worker): void { 21 worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => { 22 const call = pending.get(msg.callId); 23 if (!call) return; 24 pending.delete(msg.callId); 25 if (msg.error !== undefined) { 26 call.reject(msg.error); 27 } else { 28 call.resolve(deserializeArg(msg.value)); 29 } 30 }); 31} 32 33function isAsyncGenerator(arg: unknown): boolean { 34 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 35} 36 37/** 38 * Pipes an AsyncIterable from the parent thread to a worker-facing MessagePort. 39 * Creates a producer-side pipeIterable loop with atomics backpressure and 40 * returns a serialized consumer-side handle for the worker. 41 */ 42function pipeArgToWorker(iterable: AsyncIterable<unknown>): SerializedStreamHandle { 43 const { port1, port2 } = new MessageChannel(); 44 port1.unref(); 45 const flags = newPipeFlags(); 46 const producerHandle: StreamHandle = { port: port1, flags, highWater: DEFAULT_HIGH_WATER }; 47 void pipeIterable(iterable, producerHandle, { extractTransfers: true }); 48 streamPortStack[streamPortStack.length - 1].push(port2); 49 return serializeStreamHandle({ port: port2, flags, highWater: DEFAULT_HIGH_WATER }); 50} 51 52function prepareArg(arg: unknown): unknown { 53 // Auto-detect AbortSignal args — mark transferable and include in transfer list 54 if (arg instanceof AbortSignal) { 55 const signal = transferableAbortSignal(arg); 56 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); 57 return signal; 58 } 59 // Auto-detect AsyncGenerator args — pipe via MessageChannel + atomics backpressure 60 if (isAsyncGenerator(arg)) { 61 return pipeArgToWorker(arg as AsyncIterable<unknown>); 62 } 63 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output 64 if (arg instanceof AsyncIterableTask) { 65 return pipeArgToWorker(runStreamOnDedicated(arg.id, arg.args) as AsyncIterable<unknown>); 66 } 67 // Channel wrapper — single distributor loop + per-consumer atomics backpressure. 68 if (arg instanceof Channel) { 69 const handle = arg.addConsumer(); 70 streamPortStack[streamPortStack.length - 1].push(handle.port); 71 return serializeStreamHandle(handle); 72 } 73 if (arg instanceof PromiseLikeTask) { 74 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; 75 } 76 return serializeArg(arg); 77} 78 79export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> { 80 const url = id.slice(0, id.lastIndexOf('#')); 81 freezeModule(url); 82 const callId = nextCallId++; 83 return new Promise<T>((resolve, reject) => { 84 pending.set(callId, { resolve, reject }); 85 const extracted = extractTransferables(args); 86 streamPortStack.push([]); 87 const preparedArgs = extracted.args.map(prepareArg); 88 const ports = streamPortStack.pop()!; 89 const msg = { callId, id, args: preparedArgs }; 90 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); 91 }); 92} 93 94export interface StreamDispatch<T> { 95 iterable: AsyncIterable<T>; 96 done: Promise<void>; 97} 98 99export function dispatchStream<T>( 100 worker: Worker, 101 id: string, 102 args: unknown[], 103 opts?: ChannelOptions, 104): StreamDispatch<T> { 105 const url = id.slice(0, id.lastIndexOf('#')); 106 freezeModule(url); 107 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 108 109 const { port1, port2 } = new MessageChannel(); 110 // Atomics-based backpressure: shared inflight count + cancel flag. 111 // Worker parks on flags.inflight when it hits highWater; parent decrements 112 // + notifies on pull. Replaces pause/resume messages and the 113 // adaptive-yield setImmediate dance. 114 const flags = newPipeFlags(); 115 const handle: StreamHandle = { port: port2, flags, highWater }; 116 117 const extracted = extractTransferables(args); 118 streamPortStack.push([]); 119 const preparedArgs = extracted.args.map(prepareArg); 120 const ports = streamPortStack.pop()!; 121 const msg = { id, args: preparedArgs, stream: serializeStreamHandle(handle) }; 122 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 123 124 let resolveDone: () => void; 125 const donePromise = new Promise<void>((r) => { 126 resolveDone = r; 127 }); 128 129 const queue: T[] = []; 130 let done = false; 131 let error: Error | null = null; 132 let waiting: (() => void) | null = null; 133 134 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 135 if (msg.error) { 136 error = msg.error; 137 done = true; 138 port1.close(); 139 resolveDone!(); 140 if (waiting) { 141 waiting(); 142 waiting = null; 143 } 144 return; 145 } 146 if (msg.done) { 147 done = true; 148 port1.close(); 149 resolveDone!(); 150 if (waiting) { 151 waiting(); 152 waiting = null; 153 } 154 return; 155 } 156 queue.push(deserializeArg(msg.value) as T); 157 if (waiting) { 158 waiting(); 159 waiting = null; 160 } 161 }); 162 port1.unref(); 163 164 return { 165 iterable: { 166 [Symbol.asyncIterator]() { 167 return { 168 async next(): Promise<IteratorResult<T>> { 169 while (true) { 170 if (queue.length > 0) { 171 const value = queue.shift()!; 172 // Signal consumption to producer 173 flags.fields.inflight.sub(1); 174 flags.fields.inflight.notify(); 175 return { done: false, value }; 176 } 177 if (error) throw error; 178 if (done) return { done: true, value: undefined }; 179 await new Promise<void>((resolve) => { 180 waiting = resolve; 181 }); 182 } 183 }, 184 async return(): Promise<IteratorResult<T>> { 185 flags.fields.state.store(CANCEL); 186 flags.fields.inflight.notify(); 187 port1.close(); 188 resolveDone!(); 189 return { done: true, value: undefined }; 190 }, 191 }; 192 }, 193 }, 194 done: donePromise, 195 }; 196}