Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf(worker): inline arg-resolution loops; adaptive yield for streaming

Two cleanups on worker-entry.ts:

1. Arg resolution now lives as a small loop at each call site (value
task, stream task, and recursive task-arg) instead of behind a
resolveArgs helper that returned unknown[] | Promise<unknown[]>.
A `needsAsyncResolve(arg)` helper DRYs up the
`MessagePort | task-arg` check. The hot-path (value task) keeps the
sync-prologue + async-tail structure inline so fully-sync args
don't enter async mode at all.

2. Streaming emit loop uses adaptive setImmediate instead of yielding
after every emit. Arms a setImmediate whose callback flips a
`ticked` flag; if the generator's own awaits naturally tick the
loop between emits the flag flips and we skip the forced yield,
otherwise a streak of YIELD_EVERY (=16) emits without a tick
triggers a forced yield so pause/close can reach us.

Net effect: pure-CPU generators jump from ~40K items/s to ~85K (the
fixed-yield-every-16 ceiling); I/O-backed generators should approach
the no-yield ceiling (~489K) because their awaits tick naturally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+63 -35
+63 -35
src/worker-entry.ts
··· 14 14 return typeof arg === 'object' && arg !== null && '__task__' in arg; 15 15 } 16 16 17 + function needsAsyncResolve(arg: unknown): boolean { 18 + return arg instanceof MessagePort || isTaskArg(arg); 19 + } 20 + 17 21 // Returns the function synchronously when cached — callers must not await 18 22 // unconditionally, or they pay a microtask hop for every dispatch. 19 23 function resolveFn(id: string): Fn | Promise<Fn> { ··· 101 105 }; 102 106 } 103 107 104 - // Builds the resolved args array imperatively. Returns a plain array when 105 - // every arg can be resolved synchronously; only switches to a Promise at 106 - // the first arg that needs async resolution. This matters because awaiting 107 - // a plain array still costs a microtask hop. 108 - function resolveArgs(args: unknown[]): unknown[] | Promise<unknown[]> { 109 - const result: unknown[] = new Array(args.length); 110 - for (let i = 0; i < args.length; i++) { 111 - const arg = args[i]; 112 - if (arg instanceof MessagePort || isTaskArg(arg)) { 113 - return resolveArgsAsync(args, result, i); 114 - } 115 - result[i] = deserializeArg(arg); 116 - } 117 - return result; 118 - } 119 - 120 - async function resolveArgsAsync(args: unknown[], partial: unknown[], from: number): Promise<unknown[]> { 121 - for (let i = from; i < args.length; i++) { 122 - partial[i] = await resolveArg(args[i]); 123 - } 124 - return partial; 125 - } 126 - 127 108 async function resolveArg(arg: unknown): Promise<unknown> { 128 109 if (arg instanceof MessagePort) { 129 110 return portToAsyncIterable(arg); ··· 132 113 if (taskCache.has(arg.__task__)) { 133 114 return taskCache.get(arg.__task__); 134 115 } 135 - const argsM = resolveArgs(arg.args); 136 - const resolvedArgs = argsM instanceof Promise ? await argsM : argsM; 116 + const inputArgs = arg.args; 117 + const resolvedArgs = new Array(inputArgs.length); 118 + for (let i = 0; i < inputArgs.length; i++) { 119 + const a = inputArgs[i]; 120 + resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 121 + } 137 122 const fnM = resolveFn(arg.id); 138 123 const fn = fnM instanceof Promise ? await fnM : fnM; 139 124 const raw = fn(...resolvedArgs); ··· 181 166 182 167 function invokeWithArgs(callId: number, fn: Fn, args: unknown[]): void { 183 168 try { 184 - const argsM = resolveArgs(args); 185 - if (argsM instanceof Promise) { 186 - argsM.then( 187 - (resolved) => invokeAndRespond(callId, fn, resolved), 188 - (err) => postError(callId, err), 189 - ); 169 + // Sync prologue: walk args in-place until we hit one that needs async 170 + // resolution. If we finish the loop synchronously, go straight to invoke. 171 + const resolved = new Array(args.length); 172 + let i = 0; 173 + for (; i < args.length; i++) { 174 + const arg = args[i]; 175 + if (needsAsyncResolve(arg)) break; 176 + resolved[i] = deserializeArg(arg); 177 + } 178 + if (i === args.length) { 179 + invokeAndRespond(callId, fn, resolved); 190 180 return; 191 181 } 192 - invokeAndRespond(callId, fn, argsM); 182 + // Async tail: same loop body, awaiting where needed. 183 + (async () => { 184 + for (; i < args.length; i++) { 185 + const arg = args[i]; 186 + resolved[i] = needsAsyncResolve(arg) ? await resolveArg(arg) : deserializeArg(arg); 187 + } 188 + invokeAndRespond(callId, fn, resolved); 189 + })().catch((err) => postError(callId, err)); 193 190 } catch (err) { 194 191 postError(callId, err); 195 192 } ··· 217 214 try { 218 215 const fnM = resolveFn(id); 219 216 const fn = fnM instanceof Promise ? await fnM : fnM; 220 - const argsM = resolveArgs(args); 221 - const resolvedArgs = argsM instanceof Promise ? await argsM : argsM; 217 + const resolvedArgs = new Array(args.length); 218 + for (let i = 0; i < args.length; i++) { 219 + const a = args[i]; 220 + resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 221 + } 222 222 223 223 let paused = false; 224 224 let resumed: (() => void) | null = null; ··· 244 244 } 245 245 }); 246 246 247 + // Adaptive yield: arm a setImmediate whose callback flips a flag. 248 + // If the generator's natural awaits tick the event loop between emits, 249 + // the flag flips on its own and we never force a yield — parent-side 250 + // pause/close messages arrive via the loop's normal message delivery. 251 + // If the generator is purely synchronous (no natural ticks), the flag 252 + // stays unflipped and once `streak` hits YIELD_EVERY we force a yield 253 + // so pause/close can actually reach us. 254 + const YIELD_EVERY = 16; 255 + let ticked = false; 256 + setImmediate(() => { 257 + ticked = true; 258 + }); 259 + let streak = 0; 260 + 247 261 try { 248 262 const gen = fn(...resolvedArgs) as AsyncGenerator; 249 263 for await (const value of gen) { ··· 257 271 const transferList: Transferable[] = []; 258 272 collectTransferables(value, transferList); 259 273 port!.postMessage({ value: serialized, done: false }, transferList); 260 - await new Promise((r) => setImmediate(r)); 274 + streak++; 275 + if (ticked) { 276 + ticked = false; 277 + setImmediate(() => { 278 + ticked = true; 279 + }); 280 + streak = 0; 281 + } else if (streak >= YIELD_EVERY) { 282 + await new Promise((r) => setImmediate(r)); 283 + ticked = false; 284 + setImmediate(() => { 285 + ticked = true; 286 + }); 287 + streak = 0; 288 + } 261 289 } 262 290 if (!cancelled) port!.postMessage({ done: true }); 263 291 } catch (err) {