Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf(worker): avoid microtask hops on fully-sync task dispatch

Previous optimization cached the fn + added a sync fast-path for
resolveArgs, but still awaited them unconditionally — which costs a
microtask hop even when the callee returns synchronously.

This change threads "T | Promise<T>" through the hot path and uses
`instanceof Promise` checks to only await when there's actually
something to wait for:

- resolveFn returns a Fn directly when cached
- resolveArgs builds the result array imperatively, switching to an
async helper only at the first arg that needs async resolution
- invokeAndRespond checks whether the moroutine's return value is a
Promise before awaiting it
- the 'message' listener is no longer async; value-task handling runs
fully synchronously when nothing in the chain returns a Promise

Streaming tasks use their own path (handleStreamTask) which remains
async — they're inherently long-lived and the async overhead doesn't
dominate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+157 -64
+157 -64
src/worker-entry.ts
··· 6 6 7 7 const imported = new Set<string>(); 8 8 const taskCache = new Map<number, unknown>(); 9 - const fnCache = new Map<string, (...args: unknown[]) => unknown>(); 9 + 10 + type Fn = (...args: unknown[]) => unknown; 11 + const fnCache = new Map<string, Fn>(); 10 12 11 13 function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } { 12 14 return typeof arg === 'object' && arg !== null && '__task__' in arg; 13 15 } 14 16 15 - async function resolveFn(id: string): Promise<(...args: unknown[]) => unknown> { 17 + // Returns the function synchronously when cached — callers must not await 18 + // unconditionally, or they pay a microtask hop for every dispatch. 19 + function resolveFn(id: string): Fn | Promise<Fn> { 16 20 const cached = fnCache.get(id); 17 21 if (cached) return cached; 22 + return resolveFnSlow(id); 23 + } 24 + 25 + async function resolveFnSlow(id: string): Promise<Fn> { 18 26 const url = id.slice(0, id.lastIndexOf('#')); 19 27 if (!imported.has(url)) { 20 28 await import(url); 21 29 imported.add(url); 22 30 } 23 - const fn = registry.get(id) as ((...args: unknown[]) => unknown) | undefined; 31 + const fn = registry.get(id) as Fn | undefined; 24 32 if (!fn) throw new Error(`Moroutine not found: ${id}`); 25 33 fnCache.set(id, fn); 26 34 return fn; ··· 93 101 }; 94 102 } 95 103 96 - function needsAsyncResolve(arg: unknown): boolean { 97 - return arg instanceof MessagePort || isTaskArg(arg); 104 + // Builds the resolved args array imperatively. Returns a plain array when 105 + // every arg can be resolved synchronously; only switches to a Promise at 106 + // the first arg that needs async resolution. This matters because awaiting 107 + // a plain array still costs a microtask hop. 108 + function resolveArgs(args: unknown[]): unknown[] | Promise<unknown[]> { 109 + const result: unknown[] = new Array(args.length); 110 + for (let i = 0; i < args.length; i++) { 111 + const arg = args[i]; 112 + if (arg instanceof MessagePort || isTaskArg(arg)) { 113 + return resolveArgsAsync(args, result, i); 114 + } 115 + result[i] = deserializeArg(arg); 116 + } 117 + return result; 98 118 } 99 119 100 - function resolveArgs(args: unknown[]): unknown[] | Promise<unknown[]> { 101 - return args.some(needsAsyncResolve) ? Promise.all(args.map(resolveArg)) : args.map(deserializeArg); 120 + async function resolveArgsAsync(args: unknown[], partial: unknown[], from: number): Promise<unknown[]> { 121 + for (let i = from; i < args.length; i++) { 122 + partial[i] = await resolveArg(args[i]); 123 + } 124 + return partial; 102 125 } 103 126 104 127 async function resolveArg(arg: unknown): Promise<unknown> { ··· 109 132 if (taskCache.has(arg.__task__)) { 110 133 return taskCache.get(arg.__task__); 111 134 } 112 - const resolvedArgs = await resolveArgs(arg.args); 113 - const fn = await resolveFn(arg.id); 114 - const value = await fn(...resolvedArgs); 135 + const argsM = resolveArgs(arg.args); 136 + const resolvedArgs = argsM instanceof Promise ? await argsM : argsM; 137 + const fnM = resolveFn(arg.id); 138 + const fn = fnM instanceof Promise ? await fnM : fnM; 139 + const raw = fn(...resolvedArgs); 140 + const value = raw instanceof Promise ? await raw : raw; 115 141 taskCache.set(arg.__task__, value); 116 142 return value; 117 143 } 118 144 return deserializeArg(arg); 119 145 } 120 146 121 - parentPort!.on('message', async (msg: { callId?: number; id: string; args: unknown[]; port?: MessagePort }) => { 122 - const { id, args, port } = msg; 147 + type TaskMsg = { callId?: number; id: string; args: unknown[]; port?: MessagePort }; 148 + 149 + function postResult(callId: number, value: unknown): void { 150 + const returnValue = serializeArg(value); 151 + const transferList: Transferable[] = []; 152 + collectTransferables(value, transferList); 153 + parentPort!.postMessage({ callId, value: returnValue }, transferList); 154 + } 155 + 156 + function postError(callId: number, err: unknown): void { 157 + parentPort!.postMessage({ 158 + callId, 159 + error: err instanceof Error ? err : new Error(String(err)), 160 + }); 161 + } 162 + 163 + // Value-task path, structured to pay zero microtask hops when the whole 164 + // chain (fn resolution, arg resolution, function invocation) is sync. 165 + function handleValueTask(msg: TaskMsg): void { 166 + const callId = msg.callId!; 167 + try { 168 + const fnM = resolveFn(msg.id); 169 + if (fnM instanceof Promise) { 170 + fnM.then( 171 + (fn) => invokeWithArgs(callId, fn, msg.args), 172 + (err) => postError(callId, err), 173 + ); 174 + return; 175 + } 176 + invokeWithArgs(callId, fnM, msg.args); 177 + } catch (err) { 178 + postError(callId, err); 179 + } 180 + } 181 + 182 + function invokeWithArgs(callId: number, fn: Fn, args: unknown[]): void { 123 183 try { 124 - const fn = await resolveFn(id); 184 + const argsM = resolveArgs(args); 185 + if (argsM instanceof Promise) { 186 + argsM.then( 187 + (resolved) => invokeAndRespond(callId, fn, resolved), 188 + (err) => postError(callId, err), 189 + ); 190 + return; 191 + } 192 + invokeAndRespond(callId, fn, argsM); 193 + } catch (err) { 194 + postError(callId, err); 195 + } 196 + } 125 197 126 - const resolvedArgs = await resolveArgs(args); 198 + function invokeAndRespond(callId: number, fn: Fn, args: unknown[]): void { 199 + try { 200 + const raw = fn(...args); 201 + if (raw instanceof Promise) { 202 + raw.then( 203 + (v) => postResult(callId, v), 204 + (err) => postError(callId, err), 205 + ); 206 + return; 207 + } 208 + postResult(callId, raw); 209 + } catch (err) { 210 + postError(callId, err); 211 + } 212 + } 127 213 128 - if (port) { 129 - let paused = false; 130 - let resumed: (() => void) | null = null; 131 - let cancelled = false; 214 + async function handleStreamTask(msg: TaskMsg): Promise<void> { 215 + const { id, args, port } = msg; 216 + const callId = msg.callId; 217 + try { 218 + const fnM = resolveFn(id); 219 + const fn = fnM instanceof Promise ? await fnM : fnM; 220 + const argsM = resolveArgs(args); 221 + const resolvedArgs = argsM instanceof Promise ? await argsM : argsM; 132 222 133 - port.on('message', (signal: string) => { 134 - if (signal === 'pause') { 135 - paused = true; 136 - } else if (signal === 'resume') { 137 - paused = false; 138 - if (resumed) { 139 - resumed(); 140 - resumed = null; 141 - } 142 - } 143 - }); 223 + let paused = false; 224 + let resumed: (() => void) | null = null; 225 + let cancelled = false; 144 226 145 - port.on('close', () => { 146 - cancelled = true; 227 + port!.on('message', (signal: string) => { 228 + if (signal === 'pause') { 229 + paused = true; 230 + } else if (signal === 'resume') { 231 + paused = false; 147 232 if (resumed) { 148 233 resumed(); 149 234 resumed = null; 150 235 } 151 - }); 236 + } 237 + }); 152 238 153 - try { 154 - const gen = fn(...resolvedArgs) as AsyncGenerator; 155 - for await (const value of gen) { 156 - if (cancelled) break; 157 - if (paused) 158 - await new Promise<void>((r) => { 159 - resumed = r; 160 - }); 161 - if (cancelled) break; 162 - const serialized = serializeArg(value); 163 - const transferList: Transferable[] = []; 164 - collectTransferables(value, transferList); 165 - port.postMessage({ value: serialized, done: false }, transferList); 166 - await new Promise((r) => setImmediate(r)); 167 - } 168 - if (!cancelled) port.postMessage({ done: true }); 169 - } catch (err) { 170 - if (!cancelled) { 171 - port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 172 - } 239 + port!.on('close', () => { 240 + cancelled = true; 241 + if (resumed) { 242 + resumed(); 243 + resumed = null; 244 + } 245 + }); 246 + 247 + try { 248 + const gen = fn(...resolvedArgs) as AsyncGenerator; 249 + for await (const value of gen) { 250 + if (cancelled) break; 251 + if (paused) 252 + await new Promise<void>((r) => { 253 + resumed = r; 254 + }); 255 + if (cancelled) break; 256 + const serialized = serializeArg(value); 257 + const transferList: Transferable[] = []; 258 + collectTransferables(value, transferList); 259 + port!.postMessage({ value: serialized, done: false }, transferList); 260 + await new Promise((r) => setImmediate(r)); 261 + } 262 + if (!cancelled) port!.postMessage({ done: true }); 263 + } catch (err) { 264 + if (!cancelled) { 265 + port!.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 173 266 } 174 - try { 175 - port.close(); 176 - } catch {} 177 - return; 178 267 } 268 + try { 269 + port!.close(); 270 + } catch {} 271 + } catch (err) { 272 + if (callId != null) postError(callId, err); 273 + } 274 + } 179 275 180 - const callId = msg.callId!; 181 - const value = await fn(...resolvedArgs); 182 - const returnValue = serializeArg(value); 183 - const transferList: Transferable[] = []; 184 - collectTransferables(value, transferList); 185 - parentPort!.postMessage({ callId, value: returnValue }, transferList); 186 - } catch (err) { 187 - parentPort!.postMessage({ callId: msg.callId!, error: err instanceof Error ? err : new Error(String(err)) }); 276 + parentPort!.on('message', (msg: TaskMsg) => { 277 + if (msg.port) { 278 + void handleStreamTask(msg); 279 + } else { 280 + handleValueTask(msg); 188 281 } 189 282 });