Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 4e914ac426f4ddcfc1040c1954a58b465e24dbe1 243 lines 7.6 kB view raw
1import type { Worker } from 'node:worker_threads'; 2import { MessageChannel } from 'node:worker_threads'; 3import type { MessagePort, Transferable } from 'node:worker_threads'; 4import { transferableAbortSignal } from 'node:util'; 5import { freezeModule } from './registry.ts'; 6import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 7import { extractTransferables, collectTransferables } from './transfer.ts'; 8import { Task } from './task.ts'; 9import { StreamTask } from './stream-task.ts'; 10import { runStreamOnDedicated } from './dedicated-runner.ts'; 11import { CHANNEL, Channel } from './channel.ts'; 12import type { ChannelOptions } from './channel.ts'; 13 14let nextCallId = 0; 15const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); 16const streamPortStack: MessagePort[][] = []; 17 18export function setupWorker(worker: Worker): void { 19 worker.on('message', (msg: { callId: number; value?: unknown; error?: string }) => { 20 const call = pending.get(msg.callId); 21 if (!call) return; 22 pending.delete(msg.callId); 23 // Unref the worker now that this call is done (re-ref happened in execute()) 24 worker.unref(); 25 if (msg.error !== undefined) { 26 call.reject(new Error(msg.error)); 27 } else { 28 call.resolve(deserializeArg(msg.value)); 29 } 30 }); 31} 32 33const DEFAULT_HIGH_WATER = 16; 34const LOW_WATER = 1; 35 36function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void { 37 let paused = false; 38 let resumed: (() => void) | null = null; 39 let cancelled = false; 40 41 port.on('message', (signal: string) => { 42 if (signal === 'pause') { 43 paused = true; 44 } else if (signal === 'resume') { 45 paused = false; 46 if (resumed) { 47 resumed(); 48 resumed = null; 49 } 50 } 51 }); 52 53 port.on('close', () => { 54 cancelled = true; 55 if (resumed) { 56 resumed(); 57 resumed = null; 58 } 59 }); 60 61 (async () => { 62 try { 63 for await (const value of iterable) { 64 if (cancelled) break; 65 if (paused) 66 await new Promise<void>((r) => { 67 resumed = r; 68 }); 69 if (cancelled) break; 70 const extracted = extractTransferables([value]); 71 const serialized = serializeArg(extracted.args[0]); 72 const transferList: Transferable[] = [...extracted.transfer]; 73 collectTransferables(extracted.args[0], transferList); 74 port.postMessage({ value: serialized, done: false }, transferList as any[]); 75 } 76 if (!cancelled) port.postMessage({ done: true }); 77 } catch (err) { 78 if (!cancelled) { 79 const message = err instanceof Error ? err.message : String(err); 80 port.postMessage({ done: true, error: message }); 81 } 82 } 83 try { 84 port.close(); 85 } catch {} 86 })(); 87} 88 89function isAsyncGenerator(arg: unknown): boolean { 90 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 91} 92 93function prepareArg(arg: unknown): unknown { 94 // Auto-detect AbortSignal args — mark transferable and include in transfer list 95 if (arg instanceof AbortSignal) { 96 const signal = transferableAbortSignal(arg); 97 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); 98 return signal; 99 } 100 // Auto-detect AsyncGenerator args — pipe via MessageChannel 101 if (isAsyncGenerator(arg)) { 102 const { port1, port2 } = new MessageChannel(); 103 port1.unref(); 104 pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER); 105 streamPortStack[streamPortStack.length - 1].push(port2); 106 return port2; 107 } 108 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output 109 if (arg instanceof StreamTask) { 110 const iterable = runStreamOnDedicated(arg.id, arg.args); 111 const { port1, port2 } = new MessageChannel(); 112 port1.unref(); 113 pipeToPort(iterable, port1, DEFAULT_HIGH_WATER); 114 streamPortStack[streamPortStack.length - 1].push(port2); 115 return port2; 116 } 117 // Channel wrapper — supports fan-out via shared distributor 118 if (arg instanceof Channel) { 119 const port2 = arg.addConsumer(); 120 streamPortStack[streamPortStack.length - 1].push(port2); 121 return port2; 122 } 123 if (arg instanceof Task) { 124 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; 125 } 126 return serializeArg(arg); 127} 128 129export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> { 130 const url = id.slice(0, id.lastIndexOf('#')); 131 freezeModule(url); 132 const callId = nextCallId++; 133 // Ref the worker for the duration of this call so the event loop stays alive 134 worker.ref(); 135 return new Promise<T>((resolve, reject) => { 136 pending.set(callId, { resolve, reject }); 137 const extracted = extractTransferables(args); 138 streamPortStack.push([]); 139 const preparedArgs = extracted.args.map(prepareArg); 140 const ports = streamPortStack.pop()!; 141 const msg = { callId, id, args: preparedArgs }; 142 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); 143 }); 144} 145 146export function dispatchStream<T>( 147 worker: Worker, 148 id: string, 149 args: unknown[], 150 opts?: ChannelOptions, 151): AsyncIterable<T> { 152 const url = id.slice(0, id.lastIndexOf('#')); 153 freezeModule(url); 154 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 155 156 const { port1, port2 } = new MessageChannel(); 157 158 const extracted = extractTransferables(args); 159 streamPortStack.push([]); 160 const preparedArgs = extracted.args.map(prepareArg); 161 const ports = streamPortStack.pop()!; 162 const msg = { id, args: preparedArgs, port: port2 }; 163 // Ref the worker for the duration of the stream so the event loop stays alive 164 worker.ref(); 165 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 166 167 const queue: T[] = []; 168 let done = false; 169 let error: Error | null = null; 170 let paused = false; 171 let waiting: (() => void) | null = null; 172 let workerUnrefed = false; 173 174 function unrefWorkerOnce() { 175 if (!workerUnrefed) { 176 workerUnrefed = true; 177 worker.unref(); 178 } 179 } 180 181 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: string }) => { 182 if (msg.error) { 183 error = new Error(msg.error); 184 done = true; 185 port1.close(); 186 unrefWorkerOnce(); 187 if (waiting) { 188 waiting(); 189 waiting = null; 190 } 191 return; 192 } 193 if (msg.done) { 194 done = true; 195 port1.close(); 196 unrefWorkerOnce(); 197 if (waiting) { 198 waiting(); 199 waiting = null; 200 } 201 return; 202 } 203 queue.push(deserializeArg(msg.value) as T); 204 if (waiting) { 205 waiting(); 206 waiting = null; 207 } 208 if (!paused && queue.length >= highWater) { 209 paused = true; 210 port1.postMessage('pause'); 211 } 212 }); 213 port1.unref(); 214 215 return { 216 [Symbol.asyncIterator]() { 217 return { 218 async next(): Promise<IteratorResult<T>> { 219 while (true) { 220 if (queue.length > 0) { 221 const value = queue.shift()!; 222 if (paused && queue.length <= LOW_WATER) { 223 paused = false; 224 port1.postMessage('resume'); 225 } 226 return { done: false, value }; 227 } 228 if (error) throw error; 229 if (done) return { done: true, value: undefined }; 230 await new Promise<void>((resolve) => { 231 waiting = resolve; 232 }); 233 } 234 }, 235 async return(): Promise<IteratorResult<T>> { 236 port1.close(); 237 unrefWorkerOnce(); 238 return { done: true, value: undefined }; 239 }, 240 }; 241 }, 242 }; 243}