Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: plumb highWaterMark through to stream producer

dispatchStream was reading opts.highWaterMark into a local but never
forwarding it to the worker. Worker-side pipeIterable was defaulting
to its own hardcoded 16, so user-supplied highWaterMark had no effect
after the atomics refactor.

Include highWater in the dispatch message; handleStreamTask passes it
to pipeIterable's flags-branch so the producer parks at the requested
threshold.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+13 -3
+1 -1
src/execute.ts
··· 117 117 streamPortStack.push([]); 118 118 const preparedArgs = extracted.args.map(prepareArg); 119 119 const ports = streamPortStack.pop()!; 120 - const msg = { id, args: preparedArgs, port: port2, flags: flags.buffer }; 120 + const msg = { id, args: preparedArgs, port: port2, flags: flags.buffer, highWater }; 121 121 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 122 122 123 123 let resolveDone: () => void;
+12 -2
src/worker-entry.ts
··· 166 166 return deserializeArg(arg); 167 167 } 168 168 169 - type TaskMsg = { callId?: number; id: string; args: unknown[]; port?: MessagePort; flags?: SharedArrayBuffer }; 169 + type TaskMsg = { 170 + callId?: number; 171 + id: string; 172 + args: unknown[]; 173 + port?: MessagePort; 174 + flags?: SharedArrayBuffer; 175 + highWater?: number; 176 + }; 170 177 171 178 function postResult(callId: number, value: unknown): void { 172 179 const returnValue = serializeArg(value); ··· 257 264 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 258 265 } 259 266 const gen = fn(...resolvedArgs) as AsyncGenerator; 260 - await pipeIterable(gen, port!, { flags: flags ? pipeFlagsFromBuffer(flags) : undefined }); 267 + await pipeIterable(gen, port!, { 268 + flags: flags ? pipeFlagsFromBuffer(flags) : undefined, 269 + highWater: msg.highWater, 270 + }); 261 271 } catch (err) { 262 272 if (callId != null) postError(callId, err); 263 273 }