Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5e7769fd426e1a9570185b89bf20548507157c9f 241 lines 7.4 kB view raw
1import type { Worker } from 'node:worker_threads'; 2import { MessageChannel } from 'node:worker_threads'; 3import type { MessagePort, Transferable } from 'node:worker_threads'; 4import { transferableAbortSignal } from 'node:util'; 5import { freezeModule } from './registry.ts'; 6import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 7import { extractTransferables, collectTransferables } from './transfer.ts'; 8import { PromiseLikeTask } from './task.ts'; 9import { AsyncIterableTask } from './stream-task.ts'; 10import { runStreamOnDedicated } from './dedicated-runner.ts'; 11import { CHANNEL, Channel } from './channel.ts'; 12import type { ChannelOptions } from './channel.ts'; 13 14let nextCallId = 0; 15const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); 16const streamPortStack: MessagePort[][] = []; 17 18export function setupWorker(worker: Worker): void { 19 worker.on('message', (msg: { callId: number; value?: unknown; error?: Error }) => { 20 const call = pending.get(msg.callId); 21 if (!call) return; 22 pending.delete(msg.callId); 23 if (msg.error !== undefined) { 24 call.reject(msg.error); 25 } else { 26 call.resolve(deserializeArg(msg.value)); 27 } 28 }); 29} 30 31const DEFAULT_HIGH_WATER = 16; 32const LOW_WATER = 1; 33 34function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void { 35 let paused = false; 36 let resumed: (() => void) | null = null; 37 let cancelled = false; 38 39 port.on('message', (signal: string) => { 40 if (signal === 'pause') { 41 paused = true; 42 } else if (signal === 'resume') { 43 paused = false; 44 if (resumed) { 45 resumed(); 46 resumed = null; 47 } 48 } 49 }); 50 51 port.on('close', () => { 52 cancelled = true; 53 if (resumed) { 54 resumed(); 55 resumed = null; 56 } 57 }); 58 59 (async () => { 60 try { 61 for await (const value of iterable) { 62 if (cancelled) break; 63 if (paused) 64 await new Promise<void>((r) => { 65 resumed = r; 66 }); 67 if (cancelled) break; 68 const extracted = extractTransferables([value]); 69 const serialized = serializeArg(extracted.args[0]); 70 const transferList: Transferable[] = [...extracted.transfer]; 71 collectTransferables(extracted.args[0], transferList); 72 port.postMessage({ value: serialized, done: false }, transferList as any[]); 73 } 74 if (!cancelled) port.postMessage({ done: true }); 75 } catch (err) { 76 if (!cancelled) { 77 port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 78 } 79 } 80 try { 81 port.close(); 82 } catch {} 83 })(); 84} 85 86function isAsyncGenerator(arg: unknown): boolean { 87 return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 88} 89 90function prepareArg(arg: unknown): unknown { 91 // Auto-detect AbortSignal args — mark transferable and include in transfer list 92 if (arg instanceof AbortSignal) { 93 const signal = transferableAbortSignal(arg); 94 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); 95 return signal; 96 } 97 // Auto-detect AsyncGenerator args — pipe via MessageChannel 98 if (isAsyncGenerator(arg)) { 99 const { port1, port2 } = new MessageChannel(); 100 port1.unref(); 101 pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER); 102 streamPortStack[streamPortStack.length - 1].push(port2); 103 return port2; 104 } 105 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output 106 if (arg instanceof AsyncIterableTask) { 107 const iterable = runStreamOnDedicated(arg.id, arg.args); 108 const { port1, port2 } = new MessageChannel(); 109 port1.unref(); 110 pipeToPort(iterable, port1, DEFAULT_HIGH_WATER); 111 streamPortStack[streamPortStack.length - 1].push(port2); 112 return port2; 113 } 114 // Channel wrapper — supports fan-out via shared distributor 115 if (arg instanceof Channel) { 116 const port2 = arg.addConsumer(); 117 streamPortStack[streamPortStack.length - 1].push(port2); 118 return port2; 119 } 120 if (arg instanceof PromiseLikeTask) { 121 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; 122 } 123 return serializeArg(arg); 124} 125 126export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> { 127 const url = id.slice(0, id.lastIndexOf('#')); 128 freezeModule(url); 129 const callId = nextCallId++; 130 return new Promise<T>((resolve, reject) => { 131 pending.set(callId, { resolve, reject }); 132 const extracted = extractTransferables(args); 133 streamPortStack.push([]); 134 const preparedArgs = extracted.args.map(prepareArg); 135 const ports = streamPortStack.pop()!; 136 const msg = { callId, id, args: preparedArgs }; 137 worker.postMessage(msg, [...extracted.transfer, ...ports] as any[]); 138 }); 139} 140 141export interface StreamDispatch<T> { 142 iterable: AsyncIterable<T>; 143 done: Promise<void>; 144} 145 146export function dispatchStream<T>( 147 worker: Worker, 148 id: string, 149 args: unknown[], 150 opts?: ChannelOptions, 151): StreamDispatch<T> { 152 const url = id.slice(0, id.lastIndexOf('#')); 153 freezeModule(url); 154 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; 155 156 const { port1, port2 } = new MessageChannel(); 157 158 const extracted = extractTransferables(args); 159 streamPortStack.push([]); 160 const preparedArgs = extracted.args.map(prepareArg); 161 const ports = streamPortStack.pop()!; 162 const msg = { id, args: preparedArgs, port: port2 }; 163 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 164 165 let resolveDone: () => void; 166 const donePromise = new Promise<void>((r) => { 167 resolveDone = r; 168 }); 169 170 const queue: T[] = []; 171 let done = false; 172 let error: Error | null = null; 173 let paused = false; 174 let waiting: (() => void) | null = null; 175 176 port1.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 177 if (msg.error) { 178 error = msg.error; 179 done = true; 180 port1.close(); 181 resolveDone!(); 182 if (waiting) { 183 waiting(); 184 waiting = null; 185 } 186 return; 187 } 188 if (msg.done) { 189 done = true; 190 port1.close(); 191 resolveDone!(); 192 if (waiting) { 193 waiting(); 194 waiting = null; 195 } 196 return; 197 } 198 queue.push(deserializeArg(msg.value) as T); 199 if (waiting) { 200 waiting(); 201 waiting = null; 202 } 203 if (!paused && queue.length >= highWater) { 204 paused = true; 205 port1.postMessage('pause'); 206 } 207 }); 208 port1.unref(); 209 210 return { 211 iterable: { 212 [Symbol.asyncIterator]() { 213 return { 214 async next(): Promise<IteratorResult<T>> { 215 while (true) { 216 if (queue.length > 0) { 217 const value = queue.shift()!; 218 if (paused && queue.length <= LOW_WATER) { 219 paused = false; 220 port1.postMessage('resume'); 221 } 222 return { done: false, value }; 223 } 224 if (error) throw error; 225 if (done) return { done: true, value: undefined }; 226 await new Promise<void>((resolve) => { 227 waiting = resolve; 228 }); 229 } 230 }, 231 async return(): Promise<IteratorResult<T>> { 232 port1.close(); 233 resolveDone!(); 234 return { done: true, value: undefined }; 235 }, 236 }; 237 }, 238 }, 239 done: donePromise, 240 }; 241}