Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 255 lines 7.8 kB view raw
1import { parentPort, MessagePort } from 'node:worker_threads'; 2import type { Transferable } from 'node:worker_threads'; 3import { registry } from './registry.ts'; 4import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5import { collectTransferables } from './transfer.ts'; 6import { pipeIterable, CANCEL, DEFAULT_HIGH_WATER, deserializeStreamHandle, isSerializedStreamHandle } from './pipe.ts'; 7import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; 8 9const imported = new Set<string>(); 10const taskCache = new Map<number, unknown>(); 11 12type Fn = (...args: unknown[]) => unknown; 13const fnCache = new Map<string, Fn>(); 14 15function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } { 16 return typeof arg === 'object' && arg !== null && '__task__' in arg; 17} 18 19function needsAsyncResolve(arg: unknown): boolean { 20 return arg instanceof MessagePort || isSerializedStreamHandle(arg) || isTaskArg(arg); 21} 22 23// Returns the function synchronously when cached — callers must not await 24// unconditionally, or they pay a microtask hop for every dispatch. 25function resolveFn(id: string): Fn | Promise<Fn> { 26 const cached = fnCache.get(id); 27 if (cached) return cached; 28 return resolveFnSlow(id); 29} 30 31async function resolveFnSlow(id: string): Promise<Fn> { 32 const url = id.slice(0, id.lastIndexOf('#')); 33 if (!imported.has(url)) { 34 await import(url); 35 imported.add(url); 36 } 37 const fn = registry.get(id) as Fn | undefined; 38 if (!fn) throw new Error(`Moroutine not found: ${id}`); 39 fnCache.set(id, fn); 40 return fn; 41} 42 43function portToAsyncIterable<T>(handle: StreamHandle): AsyncIterable<T> { 44 const { port, highWater, readySignal } = handle; 45 const inflight = handle.flags.fields.inflight; 46 const state = handle.flags.fields.state; 47 48 const queue: T[] = []; 49 let done = false; 50 let error: Error | null = null; 51 let waiting: (() => void) | null = null; 52 53 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 54 if (msg.error) { 55 error = msg.error; 56 done = true; 57 if (waiting) { 58 waiting(); 59 waiting = null; 60 } 61 return; 62 } 63 if (msg.done) { 64 done = true; 65 if (waiting) { 66 waiting(); 67 waiting = null; 68 } 69 return; 70 } 71 queue.push(deserializeArg(msg.value) as T); 72 if (waiting) { 73 waiting(); 74 waiting = null; 75 } 76 }); 77 78 return { 79 [Symbol.asyncIterator]() { 80 return { 81 async next(): Promise<IteratorResult<T>> { 82 while (true) { 83 if (queue.length > 0) { 84 const value = queue.shift()!; 85 // Atomics-based backpressure: decrement inflight. In stream 86 // mode the producer parks on inflight, so we notify it; in 87 // channel mode the producer parks on readySignal, and we 88 // only wake it on the cap→below-cap transition. 89 const prev = inflight.sub(1); 90 if (readySignal) { 91 if (prev === highWater) { 92 readySignal.add(1); 93 readySignal.notify(); 94 } 95 } else { 96 inflight.notify(); 97 } 98 return { done: false, value }; 99 } 100 if (error) throw error; 101 if (done) return { done: true, value: undefined }; 102 await new Promise<void>((resolve) => { 103 waiting = resolve; 104 }); 105 } 106 }, 107 async return(): Promise<IteratorResult<T>> { 108 state.store(CANCEL); 109 inflight.notify(); 110 if (readySignal) { 111 readySignal.add(1); 112 readySignal.notify(); 113 } 114 port.close(); 115 return { done: true, value: undefined }; 116 }, 117 }; 118 }, 119 }; 120} 121 122async function resolveArg(arg: unknown): Promise<unknown> { 123 if (isSerializedStreamHandle(arg)) { 124 return portToAsyncIterable(deserializeStreamHandle(arg)); 125 } 126 if (isTaskArg(arg)) { 127 if (taskCache.has(arg.__task__)) { 128 return taskCache.get(arg.__task__); 129 } 130 const inputArgs = arg.args; 131 const resolvedArgs = new Array(inputArgs.length); 132 for (let i = 0; i < inputArgs.length; i++) { 133 const a = inputArgs[i]; 134 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 135 } 136 const fnM = resolveFn(arg.id); 137 const fn = fnM instanceof Promise ? await fnM : fnM; 138 const raw = fn(...resolvedArgs); 139 const value = raw instanceof Promise ? await raw : raw; 140 taskCache.set(arg.__task__, value); 141 return value; 142 } 143 return deserializeArg(arg); 144} 145 146type TaskMsg = { 147 callId?: number; 148 id: string; 149 args: unknown[]; 150 /** Present on streaming dispatches; bundles port + flags + highWater. */ 151 stream?: SerializedStreamHandle; 152}; 153 154function postResult(callId: number, value: unknown): void { 155 const returnValue = serializeArg(value); 156 const transferList: Transferable[] = []; 157 collectTransferables(value, transferList); 158 parentPort!.postMessage({ callId, value: returnValue }, transferList); 159} 160 161function postError(callId: number, err: unknown): void { 162 parentPort!.postMessage({ 163 callId, 164 error: err instanceof Error ? err : new Error(String(err)), 165 }); 166} 167 168// Value-task path, structured to pay zero microtask hops when the whole 169// chain (fn resolution, arg resolution, function invocation) is sync. 170function handleValueTask(msg: TaskMsg): void { 171 const callId = msg.callId!; 172 try { 173 const fnM = resolveFn(msg.id); 174 if (fnM instanceof Promise) { 175 fnM.then( 176 (fn) => invokeWithArgs(callId, fn, msg.args), 177 (err) => postError(callId, err), 178 ); 179 return; 180 } 181 invokeWithArgs(callId, fnM, msg.args); 182 } catch (err) { 183 postError(callId, err); 184 } 185} 186 187function invokeWithArgs(callId: number, fn: Fn, args: unknown[]): void { 188 try { 189 // Sync prologue: walk args in-place until we hit one that needs async 190 // resolution. If we finish the loop synchronously, go straight to invoke. 191 const resolved = new Array(args.length); 192 let i = 0; 193 for (; i < args.length; i++) { 194 const arg = args[i]; 195 if (needsAsyncResolve(arg)) break; 196 resolved[i] = deserializeArg(arg); 197 } 198 if (i === args.length) { 199 invokeAndRespond(callId, fn, resolved); 200 return; 201 } 202 // Async tail: same loop body, awaiting where needed. 203 (async () => { 204 for (; i < args.length; i++) { 205 const arg = args[i]; 206 resolved[i] = needsAsyncResolve(arg) ? await resolveArg(arg) : deserializeArg(arg); 207 } 208 invokeAndRespond(callId, fn, resolved); 209 })().catch((err) => postError(callId, err)); 210 } catch (err) { 211 postError(callId, err); 212 } 213} 214 215function invokeAndRespond(callId: number, fn: Fn, args: unknown[]): void { 216 try { 217 const raw = fn(...args); 218 if (raw instanceof Promise) { 219 raw.then( 220 (v) => postResult(callId, v), 221 (err) => postError(callId, err), 222 ); 223 return; 224 } 225 postResult(callId, raw); 226 } catch (err) { 227 postError(callId, err); 228 } 229} 230 231async function handleStreamTask(msg: TaskMsg): Promise<void> { 232 const { id, args } = msg; 233 const callId = msg.callId; 234 try { 235 const fnM = resolveFn(id); 236 const fn = fnM instanceof Promise ? await fnM : fnM; 237 const resolvedArgs = new Array(args.length); 238 for (let i = 0; i < args.length; i++) { 239 const a = args[i]; 240 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 241 } 242 const gen = fn(...resolvedArgs) as AsyncGenerator; 243 await pipeIterable(gen, deserializeStreamHandle(msg.stream!)); 244 } catch (err) { 245 if (callId != null) postError(callId, err); 246 } 247} 248 249parentPort!.on('message', (msg: TaskMsg) => { 250 if (msg.stream) { 251 void handleStreamTask(msg); 252 } else { 253 handleValueTask(msg); 254 } 255});