Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: unify all stream paths on StreamHandle + atomics

Remove pipeIterableLegacy, the pipeToPort wrapper, and portToAsyncIterable's
message-based branch. AsyncGenerator and AsyncIterableTask args now produce
a serialized StreamHandle the same way Channel does, and the worker
receives one via portToAsyncIterable's atomics path.

All stream-arg producers now use pipeIterable with a StreamHandle; the
worker always receives a StreamHandle. One code path, one data shape,
no more "legacy vs atomics" branches.

~150 lines removed. Tests 332/332. No bench change observed — the
round-trip stream bottleneck moved to the worker-side passthrough
generator, not the transport. Correctness/consistency win, not a perf
win.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+40 -162
+20 -27
src/execute.ts
··· 9 9 import { AsyncIterableTask } from './stream-task.ts'; 10 10 import { runStreamOnDedicated } from './dedicated-runner.ts'; 11 11 import { CHANNEL, Channel } from './channel.ts'; 12 - import { 13 - pipeIterable, 14 - pipeIterableLegacy, 15 - newPipeFlags, 16 - CANCEL, 17 - DEFAULT_HIGH_WATER, 18 - serializeStreamHandle, 19 - } from './pipe.ts'; 20 - import type { StreamHandle } from './pipe.ts'; 12 + import { pipeIterable, newPipeFlags, CANCEL, DEFAULT_HIGH_WATER, serializeStreamHandle } from './pipe.ts'; 13 + import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; 21 14 import type { ChannelOptions } from './channel.ts'; 22 15 23 16 let nextCallId = 0; ··· 37 30 }); 38 31 } 39 32 40 - function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void { 41 - // Legacy raw-port path (pipeToPort is used for AsyncGenerator / AsyncIterableTask 42 - // args that arrive on the worker as MessagePort). Uses message-based backpressure. 43 - void pipeIterableLegacy(iterable, port, { extractTransfers: true, yieldEvery: highWaterMark }); 33 + function isAsyncGenerator(arg: unknown): boolean { 34 + return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 44 35 } 45 36 46 - function isAsyncGenerator(arg: unknown): boolean { 47 - return typeof arg === 'object' && arg !== null && (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 37 + /** 38 + * Pipes an AsyncIterable from the parent thread to a worker-facing MessagePort. 39 + * Creates a producer-side pipeIterable loop with atomics backpressure and 40 + * returns a serialized consumer-side handle for the worker. 41 + */ 42 + function pipeArgToWorker(iterable: AsyncIterable<unknown>): SerializedStreamHandle { 43 + const { port1, port2 } = new MessageChannel(); 44 + port1.unref(); 45 + const flags = newPipeFlags(); 46 + const producerHandle: StreamHandle = { port: port1, flags, highWater: DEFAULT_HIGH_WATER }; 47 + void pipeIterable(iterable, producerHandle, { extractTransfers: true }); 48 + streamPortStack[streamPortStack.length - 1].push(port2); 49 + return serializeStreamHandle({ port: port2, flags, highWater: DEFAULT_HIGH_WATER }); 48 50 } 49 51 50 52 function prepareArg(arg: unknown): unknown { ··· 54 56 streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); 55 57 return signal; 56 58 } 57 - // Auto-detect AsyncGenerator args — pipe via MessageChannel 59 + // Auto-detect AsyncGenerator args — pipe via MessageChannel + atomics backpressure 58 60 if (isAsyncGenerator(arg)) { 59 - const { port1, port2 } = new MessageChannel(); 60 - port1.unref(); 61 - pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER); 62 - streamPortStack[streamPortStack.length - 1].push(port2); 63 - return port2; 61 + return pipeArgToWorker(arg as AsyncIterable<unknown>); 64 62 } 65 63 // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output 66 64 if (arg instanceof AsyncIterableTask) { 67 - const iterable = runStreamOnDedicated(arg.id, arg.args); 68 - const { port1, port2 } = new MessageChannel(); 69 - port1.unref(); 70 - pipeToPort(iterable, port1, DEFAULT_HIGH_WATER); 71 - streamPortStack[streamPortStack.length - 1].push(port2); 72 - return port2; 65 + return pipeArgToWorker(runStreamOnDedicated(arg.id, arg.args) as AsyncIterable<unknown>); 73 66 } 74 67 // Channel wrapper — single distributor loop + per-consumer atomics backpressure. 75 68 if (arg instanceof Channel) {
-91
src/pipe.ts
··· 125 125 } catch {} 126 126 } 127 127 128 - /** 129 - * Legacy message-based pipe (pause/resume + adaptive setImmediate). Used 130 - * for paths that haven't migrated to atomics — primarily the raw 131 - * AsyncGenerator arg path through pipeToPort. 132 - */ 133 - export async function pipeIterableLegacy<T>( 134 - source: AsyncIterable<T>, 135 - port: MessagePort, 136 - opts: { yieldEvery?: number; extractTransfers?: boolean } = {}, 137 - ): Promise<void> { 138 - const yieldEvery = opts.yieldEvery ?? DEFAULT_HIGH_WATER; 139 - const extract = opts.extractTransfers ?? false; 140 - 141 - let paused = false; 142 - let resumed: (() => void) | null = null; 143 - let cancelled = false; 144 - 145 - port.on('message', (signal: string) => { 146 - if (signal === 'pause') { 147 - paused = true; 148 - } else if (signal === 'resume') { 149 - paused = false; 150 - if (resumed) { 151 - resumed(); 152 - resumed = null; 153 - } 154 - } 155 - }); 156 - 157 - port.on('close', () => { 158 - cancelled = true; 159 - if (resumed) { 160 - resumed(); 161 - resumed = null; 162 - } 163 - }); 164 - 165 - let ticked = false; 166 - setImmediate(() => { 167 - ticked = true; 168 - }); 169 - let streak = 0; 170 - 171 - try { 172 - for await (const value of source) { 173 - if (cancelled) break; 174 - if (paused) 175 - await new Promise<void>((r) => { 176 - resumed = r; 177 - }); 178 - if (cancelled) break; 179 - 180 - let serialized: unknown; 181 - const transferList: Transferable[] = []; 182 - if (extract) { 183 - const extracted = extractTransferables([value]); 184 - serialized = serializeArg(extracted.args[0]); 185 - transferList.push(...extracted.transfer); 186 - collectTransferables(extracted.args[0], transferList); 187 - } else { 188 - serialized = serializeArg(value); 189 - collectTransferables(value, transferList); 190 - } 191 - port.postMessage({ value: serialized, done: false }, transferList); 192 - 193 - streak++; 194 - if (ticked) { 195 - ticked = false; 196 - setImmediate(() => { 197 - ticked = true; 198 - }); 199 - streak = 0; 200 - } else if (streak >= yieldEvery) { 201 - await new Promise((r) => setImmediate(r)); 202 - ticked = false; 203 - setImmediate(() => { 204 - ticked = true; 205 - }); 206 - streak = 0; 207 - } 208 - } 209 - if (!cancelled) port.postMessage({ done: true }); 210 - } catch (err) { 211 - if (!cancelled) { 212 - port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 213 - } 214 - } 215 - try { 216 - port.close(); 217 - } catch {} 218 - }
+20 -44
src/worker-entry.ts
··· 46 46 return fn; 47 47 } 48 48 49 - function portToAsyncIterable<T>(arg: MessagePort | StreamHandle): AsyncIterable<T> { 50 - const handle: StreamHandle | null = isHandle(arg) ? arg : null; 51 - const port: MessagePort = handle ? handle.port : (arg as MessagePort); 52 - const highWater = handle?.highWater ?? DEFAULT_HIGH_WATER; 53 - const inflight = handle?.flags.fields.inflight; 54 - const state = handle?.flags.fields.state; 55 - const readySignal = handle?.readySignal; 49 + function portToAsyncIterable<T>(handle: StreamHandle): AsyncIterable<T> { 50 + const { port, highWater, readySignal } = handle; 51 + const inflight = handle.flags.fields.inflight; 52 + const state = handle.flags.fields.state; 56 53 57 54 const queue: T[] = []; 58 55 let done = false; 59 56 let error: Error | null = null; 60 57 let waiting: (() => void) | null = null; 61 - 62 - // Legacy message-based backpressure (when handle is null). 63 - let paused = false; 64 58 65 59 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 66 60 if (msg.error) { ··· 85 79 waiting(); 86 80 waiting = null; 87 81 } 88 - if (!handle && !paused && queue.length >= highWater) { 89 - paused = true; 90 - port.postMessage('pause'); 91 - } 92 82 }); 93 83 94 84 return { ··· 98 88 while (true) { 99 89 if (queue.length > 0) { 100 90 const value = queue.shift()!; 101 - if (inflight) { 102 - // Atomics-based backpressure: decrement inflight. In stream 103 - // mode the producer parks on inflight, so we notify it; 104 - // in channel mode the producer parks on readySignal, and 105 - // we only wake it on the cap→below-cap transition. 106 - const prev = inflight.sub(1); 107 - if (readySignal) { 108 - if (prev === highWater) { 109 - readySignal.add(1); 110 - readySignal.notify(); 111 - } 112 - } else { 113 - inflight.notify(); 91 + // Atomics-based backpressure: decrement inflight. In stream 92 + // mode the producer parks on inflight, so we notify it; in 93 + // channel mode the producer parks on readySignal, and we 94 + // only wake it on the cap→below-cap transition. 95 + const prev = inflight.sub(1); 96 + if (readySignal) { 97 + if (prev === highWater) { 98 + readySignal.add(1); 99 + readySignal.notify(); 114 100 } 115 - } else if (paused && queue.length <= 1) { 116 - paused = false; 117 - port.postMessage('resume'); 101 + } else { 102 + inflight.notify(); 118 103 } 119 104 return { done: false, value }; 120 105 } ··· 126 111 } 127 112 }, 128 113 async return(): Promise<IteratorResult<T>> { 129 - if (state && inflight) { 130 - state.store(CANCEL); 131 - inflight.notify(); 132 - if (readySignal) { 133 - readySignal.add(1); 134 - readySignal.notify(); 135 - } 114 + state.store(CANCEL); 115 + inflight.notify(); 116 + if (readySignal) { 117 + readySignal.add(1); 118 + readySignal.notify(); 136 119 } 137 120 port.close(); 138 121 return { done: true, value: undefined }; ··· 142 125 }; 143 126 } 144 127 145 - function isHandle(arg: MessagePort | StreamHandle): arg is StreamHandle { 146 - return typeof arg === 'object' && arg !== null && 'flags' in arg; 147 - } 148 - 149 128 async function resolveArg(arg: unknown): Promise<unknown> { 150 129 if (isSerializedStreamHandle(arg)) { 151 130 return portToAsyncIterable(deserializeStreamHandle(arg)); 152 - } 153 - if (arg instanceof MessagePort) { 154 - return portToAsyncIterable(arg); 155 131 } 156 132 if (isTaskArg(arg)) { 157 133 if (taskCache.has(arg.__task__)) {