Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e343bf0d85181e241d3db15360bb7a080fd71c8d 272 lines 8.6 kB view raw
1import { parentPort, MessagePort } from 'node:worker_threads'; 2import type { Transferable } from 'node:worker_threads'; 3import { registry } from './registry.ts'; 4import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5import { collectTransferables } from './transfer.ts'; 6import { pipeIterable, pipeFlagsFromBuffer, CANCEL } from './pipe.ts'; 7import type { PipeFlags } from './pipe.ts'; 8import { Int32Atomic } from './shared/int32-atomic.ts'; 9 10const imported = new Set<string>(); 11const taskCache = new Map<number, unknown>(); 12 13type Fn = (...args: unknown[]) => unknown; 14const fnCache = new Map<string, Fn>(); 15 16function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } { 17 return typeof arg === 'object' && arg !== null && '__task__' in arg; 18} 19 20function isStreamArg( 21 arg: unknown, 22): arg is { __stream__: true; port: MessagePort; flags: SharedArrayBuffer; readySignal?: SharedArrayBuffer } { 23 return typeof arg === 'object' && arg !== null && (arg as any).__stream__ === true; 24} 25 26function needsAsyncResolve(arg: unknown): boolean { 27 return arg instanceof MessagePort || isStreamArg(arg) || isTaskArg(arg); 28} 29 30// Returns the function synchronously when cached — callers must not await 31// unconditionally, or they pay a microtask hop for every dispatch. 32function resolveFn(id: string): Fn | Promise<Fn> { 33 const cached = fnCache.get(id); 34 if (cached) return cached; 35 return resolveFnSlow(id); 36} 37 38async function resolveFnSlow(id: string): Promise<Fn> { 39 const url = id.slice(0, id.lastIndexOf('#')); 40 if (!imported.has(url)) { 41 await import(url); 42 imported.add(url); 43 } 44 const fn = registry.get(id) as Fn | undefined; 45 if (!fn) throw new Error(`Moroutine not found: ${id}`); 46 fnCache.set(id, fn); 47 return fn; 48} 49 50function portToAsyncIterable<T>(port: MessagePort, flags?: PipeFlags, readySignal?: Int32Atomic): AsyncIterable<T> { 51 const queue: T[] = []; 52 let done = false; 53 let error: Error | null = null; 54 let waiting: (() => void) | null = null; 55 56 // Legacy message-based backpressure (when flags is absent). 57 let paused = false; 58 const HIGH_WATER = 16; 59 60 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 61 if (msg.error) { 62 error = msg.error; 63 done = true; 64 if (waiting) { 65 waiting(); 66 waiting = null; 67 } 68 return; 69 } 70 if (msg.done) { 71 done = true; 72 if (waiting) { 73 waiting(); 74 waiting = null; 75 } 76 return; 77 } 78 queue.push(deserializeArg(msg.value) as T); 79 if (waiting) { 80 waiting(); 81 waiting = null; 82 } 83 if (!flags && !paused && queue.length >= HIGH_WATER) { 84 paused = true; 85 port.postMessage('pause'); 86 } 87 }); 88 89 return { 90 [Symbol.asyncIterator]() { 91 return { 92 async next(): Promise<IteratorResult<T>> { 93 while (true) { 94 if (queue.length > 0) { 95 const value = queue.shift()!; 96 if (flags) { 97 // Atomics-based backpressure: decrement inflight. In stream 98 // mode the producer parks on inflight, so we notify it; 99 // in channel mode the producer parks on readySignal, and 100 // we only wake it on the cap→below-cap transition. 101 const prev = flags.inflight.sub(1); 102 if (readySignal) { 103 if (prev === 16) { 104 readySignal.add(1); 105 readySignal.notify(); 106 } 107 } else { 108 flags.inflight.notify(); 109 } 110 } else if (paused && queue.length <= 1) { 111 paused = false; 112 port.postMessage('resume'); 113 } 114 return { done: false, value }; 115 } 116 if (error) throw error; 117 if (done) return { done: true, value: undefined }; 118 await new Promise<void>((resolve) => { 119 waiting = resolve; 120 }); 121 } 122 }, 123 async return(): Promise<IteratorResult<T>> { 124 if (flags) { 125 flags.state.store(CANCEL); 126 flags.inflight.notify(); 127 if (readySignal) { 128 readySignal.add(1); 129 readySignal.notify(); 130 } 131 } 132 port.close(); 133 return { done: true, value: undefined }; 134 }, 135 }; 136 }, 137 }; 138} 139 140async function resolveArg(arg: unknown): Promise<unknown> { 141 if (isStreamArg(arg)) { 142 const flags = pipeFlagsFromBuffer(arg.flags); 143 const readySignal = arg.readySignal ? new Int32Atomic(arg.readySignal, 0) : undefined; 144 return portToAsyncIterable(arg.port, flags, readySignal); 145 } 146 if (arg instanceof MessagePort) { 147 return portToAsyncIterable(arg); 148 } 149 if (isTaskArg(arg)) { 150 if (taskCache.has(arg.__task__)) { 151 return taskCache.get(arg.__task__); 152 } 153 const inputArgs = arg.args; 154 const resolvedArgs = new Array(inputArgs.length); 155 for (let i = 0; i < inputArgs.length; i++) { 156 const a = inputArgs[i]; 157 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 158 } 159 const fnM = resolveFn(arg.id); 160 const fn = fnM instanceof Promise ? await fnM : fnM; 161 const raw = fn(...resolvedArgs); 162 const value = raw instanceof Promise ? await raw : raw; 163 taskCache.set(arg.__task__, value); 164 return value; 165 } 166 return deserializeArg(arg); 167} 168 169type TaskMsg = { callId?: number; id: string; args: unknown[]; port?: MessagePort; flags?: SharedArrayBuffer }; 170 171function postResult(callId: number, value: unknown): void { 172 const returnValue = serializeArg(value); 173 const transferList: Transferable[] = []; 174 collectTransferables(value, transferList); 175 parentPort!.postMessage({ callId, value: returnValue }, transferList); 176} 177 178function postError(callId: number, err: unknown): void { 179 parentPort!.postMessage({ 180 callId, 181 error: err instanceof Error ? err : new Error(String(err)), 182 }); 183} 184 185// Value-task path, structured to pay zero microtask hops when the whole 186// chain (fn resolution, arg resolution, function invocation) is sync. 187function handleValueTask(msg: TaskMsg): void { 188 const callId = msg.callId!; 189 try { 190 const fnM = resolveFn(msg.id); 191 if (fnM instanceof Promise) { 192 fnM.then( 193 (fn) => invokeWithArgs(callId, fn, msg.args), 194 (err) => postError(callId, err), 195 ); 196 return; 197 } 198 invokeWithArgs(callId, fnM, msg.args); 199 } catch (err) { 200 postError(callId, err); 201 } 202} 203 204function invokeWithArgs(callId: number, fn: Fn, args: unknown[]): void { 205 try { 206 // Sync prologue: walk args in-place until we hit one that needs async 207 // resolution. If we finish the loop synchronously, go straight to invoke. 208 const resolved = new Array(args.length); 209 let i = 0; 210 for (; i < args.length; i++) { 211 const arg = args[i]; 212 if (needsAsyncResolve(arg)) break; 213 resolved[i] = deserializeArg(arg); 214 } 215 if (i === args.length) { 216 invokeAndRespond(callId, fn, resolved); 217 return; 218 } 219 // Async tail: same loop body, awaiting where needed. 220 (async () => { 221 for (; i < args.length; i++) { 222 const arg = args[i]; 223 resolved[i] = needsAsyncResolve(arg) ? await resolveArg(arg) : deserializeArg(arg); 224 } 225 invokeAndRespond(callId, fn, resolved); 226 })().catch((err) => postError(callId, err)); 227 } catch (err) { 228 postError(callId, err); 229 } 230} 231 232function invokeAndRespond(callId: number, fn: Fn, args: unknown[]): void { 233 try { 234 const raw = fn(...args); 235 if (raw instanceof Promise) { 236 raw.then( 237 (v) => postResult(callId, v), 238 (err) => postError(callId, err), 239 ); 240 return; 241 } 242 postResult(callId, raw); 243 } catch (err) { 244 postError(callId, err); 245 } 246} 247 248async function handleStreamTask(msg: TaskMsg): Promise<void> { 249 const { id, args, port, flags } = msg; 250 const callId = msg.callId; 251 try { 252 const fnM = resolveFn(id); 253 const fn = fnM instanceof Promise ? await fnM : fnM; 254 const resolvedArgs = new Array(args.length); 255 for (let i = 0; i < args.length; i++) { 256 const a = args[i]; 257 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 258 } 259 const gen = fn(...resolvedArgs) as AsyncGenerator; 260 await pipeIterable(gen, port!, { flags: flags ? pipeFlagsFromBuffer(flags) : undefined }); 261 } catch (err) { 262 if (callId != null) postError(callId, err); 263 } 264} 265 266parentPort!.on('message', (msg: TaskMsg) => { 267 if (msg.port) { 268 void handleStreamTask(msg); 269 } else { 270 handleValueTask(msg); 271 } 272});