Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: StreamHandle envelope + SharedStruct-backed PipeFlags

Unify how "one streaming endpoint" is represented and transported:

- PipeFlags is now a SharedStruct<{inflight, state}> allocated via
moroutine's own shared({...}) primitive. No more hand-rolled byte
offsets or manual Int32Atomic construction over a raw SAB.
- StreamHandle is a first-class object carrying {port, flags,
readySignal?, highWater} — the complete description of one end of
a backpressured stream.
- serializeStreamHandle / deserializeStreamHandle / isSerializedStreamHandle
round-trip a handle through postMessage using the existing
serializeArg/deserializeArg machinery. No custom "rebuild from
buffer" helper — the shared-primitive layer already does this.

Callers no longer juggle free-floating {port, flags: SAB, readySignal:
SAB, highWater: number}; they pass a StreamHandle. dispatchStream's
message now has a single `stream` field; Channel.addConsumer returns
a StreamHandle directly; prepareArg serializes it for transport.

Behavior and throughput unchanged. 332/332 tests pass; channel
fan-out and stream numbers within noise of pre-refactor.

Renamed: moved the old "legacy" pause/resume + adaptive-yield pipe
to pipeIterableLegacy (still used by pipeToPort for AsyncGenerator
args that haven't migrated to atomics).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+152 -144
+23 -34
src/channel.ts
··· 2 2 import type { MessagePort, Transferable } from 'node:worker_threads'; 3 3 import { AsyncIterableTask } from './stream-task.ts'; 4 4 import { runStreamOnDedicated } from './dedicated-runner.ts'; 5 - import { newPipeFlags, CANCEL } from './pipe.ts'; 6 - import type { PipeFlags } from './pipe.ts'; 5 + import { newPipeFlags, CANCEL, DEFAULT_HIGH_WATER } from './pipe.ts'; 6 + import type { PipeFlags, StreamHandle } from './pipe.ts'; 7 7 import { Int32Atomic } from './shared/int32-atomic.ts'; 8 8 import { serializeArg } from './shared/reconstruct.ts'; 9 9 import { collectTransferables } from './transfer.ts'; 10 10 11 11 const CHANNEL = Symbol.for('moroutine.channel'); 12 - const DEFAULT_HIGH_WATER = 16; 13 12 14 13 /** Options for configuring a channel. */ 15 14 export interface ChannelOptions { 16 - /** Max number of items any one consumer can have in-flight before the 17 - * distributor skips it and gives items to other ready consumers. 18 - * Defaults to 16. */ 15 + /** Max items any one consumer can have in-flight before the distributor 16 + * skips it and gives items to other ready consumers. Defaults to 16. */ 19 17 highWaterMark?: number; 20 18 } 21 19 ··· 28 26 * One-producer, many-consumer source with atomics-based backpressure. 29 27 * 30 28 * A single `tryPull` loop iterates the shared source and dispatches each 31 - * item to the first consumer whose `inflight` atomic is below HIGH_WATER. 29 + * item to the first consumer whose `inflight` atomic is below highWater. 32 30 * When every consumer is at cap, the loop parks on a shared `readySignal` 33 31 * atomic until any consumer pulls from its port (which decrements its 34 32 * inflight and bumps readySignal, waking the loop). ··· 36 34 export class Channel<T> { 37 35 private readonly iter: AsyncIterator<T>; 38 36 private readonly consumers: Consumer[] = []; 39 - private readonly readySignalBuf: SharedArrayBuffer = new SharedArrayBuffer(4); 40 - private readonly readySignal: Int32Atomic = new Int32Atomic(this.readySignalBuf, 0); 37 + private readonly readySignal: Int32Atomic = new Int32Atomic(); 41 38 private readonly highWater: number; 42 39 private pulling = false; 43 40 private done = false; ··· 53 50 this.iter = src[Symbol.asyncIterator](); 54 51 } 55 52 56 - /** Per-consumer handle. Returns the worker-facing port, the consumer's 57 - * private flags (inflight + state), the channel-wide readySignal, and 58 - * the channel's highWaterMark (so the worker knows when to signal the 59 - * cap→below-cap transition). */ 60 - addConsumer(): { 61 - port: MessagePort; 62 - flags: SharedArrayBuffer; 63 - readySignal: SharedArrayBuffer; 64 - highWater: number; 65 - } { 53 + /** Per-consumer stream handle — complete description of this consumer's 54 + * producer-facing side of the channel. */ 55 + addConsumer(): StreamHandle { 66 56 const { port1, port2 } = new MessageChannel(); 67 57 port1.unref(); 68 58 const flags = newPipeFlags(); 69 59 60 + const handle: StreamHandle = { 61 + port: port2, 62 + flags, 63 + readySignal: this.readySignal, 64 + highWater: this.highWater, 65 + }; 66 + 70 67 if (this.done) { 71 68 if (this.error) { 72 69 port1.postMessage({ done: true, error: this.error }); ··· 76 73 try { 77 74 port1.close(); 78 75 } catch {} 79 - return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf, highWater: this.highWater }; 76 + return handle; 80 77 } 81 78 82 79 const consumer: Consumer = { port: port1, flags }; 83 80 this.consumers.push(consumer); 84 81 85 82 port1.on('close', () => { 86 - consumer.flags.state.store(CANCEL); 83 + consumer.flags.fields.state.store(CANCEL); 87 84 const idx = this.consumers.indexOf(consumer); 88 85 if (idx >= 0) this.consumers.splice(idx, 1); 89 86 this.readySignal.add(1); ··· 96 93 this.readySignal.notify(); 97 94 void this.tryPull(); 98 95 99 - return { port: port2, flags: flags.buffer, readySignal: this.readySignalBuf, highWater: this.highWater }; 96 + return handle; 100 97 } 101 98 102 99 private findReadyConsumer(): Consumer | null { 103 100 const hw = this.highWater; 104 101 for (let i = 0; i < this.consumers.length; i++) { 105 102 const c = this.consumers[i]; 106 - if (c.flags.state.load() === CANCEL) continue; 107 - if (c.flags.inflight.load() < hw) return c; 103 + if (c.flags.fields.state.load() === CANCEL) continue; 104 + if (c.flags.fields.inflight.load() < hw) return c; 108 105 } 109 106 return null; 110 107 } ··· 117 114 while (!this.done) { 118 115 let consumer = this.findReadyConsumer(); 119 116 if (!consumer) { 120 - // All at cap (or no consumers) — park on readySignal until any 121 - // consumer pulls (decrements inflight + bumps readySignal) or 122 - // a new consumer joins or an existing one closes. 123 117 const signal = this.readySignal.load(); 124 - // Double-check to close the register-then-check race window. 125 118 consumer = this.findReadyConsumer(); 126 119 if (!consumer) { 127 - if (this.consumers.length === 0) { 128 - // No consumers at all — parking forever would leak the loop. 129 - // Bail; next addConsumer restarts us. 130 - return; 131 - } 120 + if (this.consumers.length === 0) return; 132 121 await this.readySignal.waitAsync(signal); 133 122 continue; 134 123 } ··· 153 142 collectTransferables(result.value, transferList); 154 143 try { 155 144 consumer.port.postMessage({ value: serialized, done: false }, transferList); 156 - consumer.flags.inflight.add(1); 145 + consumer.flags.fields.inflight.add(1); 157 146 } catch { 158 147 // Consumer may have closed between scan and post; move on. 159 148 }
+21 -13
src/execute.ts
··· 9 9 import { AsyncIterableTask } from './stream-task.ts'; 10 10 import { runStreamOnDedicated } from './dedicated-runner.ts'; 11 11 import { CHANNEL, Channel } from './channel.ts'; 12 - import { pipeIterable, newPipeFlags, CANCEL } from './pipe.ts'; 12 + import { 13 + pipeIterable, 14 + pipeIterableLegacy, 15 + newPipeFlags, 16 + CANCEL, 17 + DEFAULT_HIGH_WATER, 18 + serializeStreamHandle, 19 + } from './pipe.ts'; 20 + import type { StreamHandle } from './pipe.ts'; 13 21 import type { ChannelOptions } from './channel.ts'; 14 22 15 23 let nextCallId = 0; ··· 29 37 }); 30 38 } 31 39 32 - const DEFAULT_HIGH_WATER = 16; 33 - const LOW_WATER = 1; 34 - 35 40 function pipeToPort(iterable: AsyncIterable<unknown>, port: MessagePort, highWaterMark: number): void { 36 - void pipeIterable(iterable, port, { extractTransfers: true, yieldEvery: highWaterMark }); 41 + // Legacy raw-port path (pipeToPort is used for AsyncGenerator / AsyncIterableTask 42 + // args that arrive on the worker as MessagePort). Uses message-based backpressure. 43 + void pipeIterableLegacy(iterable, port, { extractTransfers: true, yieldEvery: highWaterMark }); 37 44 } 38 45 39 46 function isAsyncGenerator(arg: unknown): boolean { ··· 66 73 } 67 74 // Channel wrapper — single distributor loop + per-consumer atomics backpressure. 68 75 if (arg instanceof Channel) { 69 - const { port, flags, readySignal, highWater } = arg.addConsumer(); 70 - streamPortStack[streamPortStack.length - 1].push(port); 71 - return { __stream__: true, port, flags, readySignal, highWater }; 76 + const handle = arg.addConsumer(); 77 + streamPortStack[streamPortStack.length - 1].push(handle.port); 78 + return serializeStreamHandle(handle); 72 79 } 73 80 if (arg instanceof PromiseLikeTask) { 74 81 return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; ··· 112 119 // + notifies on pull. Replaces pause/resume messages and the 113 120 // adaptive-yield setImmediate dance. 114 121 const flags = newPipeFlags(); 122 + const handle: StreamHandle = { port: port2, flags, highWater }; 115 123 116 124 const extracted = extractTransferables(args); 117 125 streamPortStack.push([]); 118 126 const preparedArgs = extracted.args.map(prepareArg); 119 127 const ports = streamPortStack.pop()!; 120 - const msg = { id, args: preparedArgs, port: port2, flags: flags.buffer, highWater }; 128 + const msg = { id, args: preparedArgs, stream: serializeStreamHandle(handle) }; 121 129 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 122 130 123 131 let resolveDone: () => void; ··· 169 177 if (queue.length > 0) { 170 178 const value = queue.shift()!; 171 179 // Signal consumption to producer 172 - flags.inflight.sub(1); 173 - flags.inflight.notify(); 180 + flags.fields.inflight.sub(1); 181 + flags.fields.inflight.notify(); 174 182 return { done: false, value }; 175 183 } 176 184 if (error) throw error; ··· 181 189 } 182 190 }, 183 191 async return(): Promise<IteratorResult<T>> { 184 - flags.state.store(CANCEL); 185 - flags.inflight.notify(); 192 + flags.fields.state.store(CANCEL); 193 + flags.fields.inflight.notify(); 186 194 port1.close(); 187 195 resolveDone!(); 188 196 return { done: true, value: undefined };
+72 -56
src/pipe.ts
··· 1 1 import type { MessagePort, Transferable } from 'node:worker_threads'; 2 - import { serializeArg } from './shared/reconstruct.ts'; 2 + import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 3 3 import { collectTransferables, extractTransferables } from './transfer.ts'; 4 - import { Int32Atomic } from './shared/int32-atomic.ts'; 4 + import { shared } from './shared/shared.ts'; 5 + import { int32atomic } from './shared/descriptors.ts'; 6 + import type { Int32Atomic } from './shared/int32-atomic.ts'; 7 + import type { SharedStruct } from './shared/shared-struct.ts'; 5 8 6 - const RUN = 0; 7 - const CANCEL = 1; 8 - const FLAGS_BYTE_LEN = 8; 9 - const INFLIGHT_OFFSET = 0; 10 - const STATE_OFFSET = 4; 9 + export const RUN = 0; 10 + export const CANCEL = 1; 11 + export const DEFAULT_HIGH_WATER = 16; 11 12 12 - export interface PipeFlags { 13 - inflight: Int32Atomic; 14 - state: Int32Atomic; 15 - /** Backing SharedArrayBuffer — transport via postMessage, reconstruct with {@link pipeFlagsFromBuffer} on the other side. */ 16 - buffer: SharedArrayBuffer; 17 - } 13 + /** Inflight count + state for one producer/consumer pair. */ 14 + export type PipeFlags = SharedStruct<{ inflight: Int32Atomic; state: Int32Atomic }>; 18 15 19 16 export function newPipeFlags(): PipeFlags { 20 - const buffer = new SharedArrayBuffer(FLAGS_BYTE_LEN); 17 + return shared({ inflight: int32atomic, state: int32atomic }); 18 + } 19 + 20 + /** 21 + * Everything a producer and consumer need to agree on for one stream 22 + * endpoint: the MessagePort, the per-endpoint backpressure `flags`, an 23 + * optional `readySignal` used by multi-consumer channels, and the 24 + * `highWater` cap. One object captures it all. 25 + * 26 + * Cross postMessage via {@link serializeStreamHandle} + {@link deserializeStreamHandle}. 27 + */ 28 + export interface StreamHandle { 29 + port: MessagePort; 30 + flags: PipeFlags; 31 + readySignal?: Int32Atomic; 32 + highWater: number; 33 + } 34 + 35 + /** Transport form — safe to drop into a dispatch message; SABs pass by reference. */ 36 + export interface SerializedStreamHandle { 37 + __stream__: true; 38 + port: MessagePort; 39 + flags: unknown; 40 + readySignal?: unknown; 41 + highWater: number; 42 + } 43 + 44 + export function serializeStreamHandle(h: StreamHandle): SerializedStreamHandle { 21 45 return { 22 - inflight: new Int32Atomic(buffer, INFLIGHT_OFFSET), 23 - state: new Int32Atomic(buffer, STATE_OFFSET), 24 - buffer, 46 + __stream__: true, 47 + port: h.port, 48 + flags: serializeArg(h.flags), 49 + readySignal: h.readySignal ? serializeArg(h.readySignal) : undefined, 50 + highWater: h.highWater, 25 51 }; 26 52 } 27 53 28 - export function pipeFlagsFromBuffer(buffer: SharedArrayBuffer): PipeFlags { 54 + export function deserializeStreamHandle(obj: SerializedStreamHandle): StreamHandle { 29 55 return { 30 - inflight: new Int32Atomic(buffer, INFLIGHT_OFFSET), 31 - state: new Int32Atomic(buffer, STATE_OFFSET), 32 - buffer, 56 + port: obj.port, 57 + flags: deserializeArg(obj.flags) as PipeFlags, 58 + readySignal: obj.readySignal ? (deserializeArg(obj.readySignal) as Int32Atomic) : undefined, 59 + highWater: obj.highWater, 33 60 }; 34 61 } 35 62 63 + export function isSerializedStreamHandle(arg: unknown): arg is SerializedStreamHandle { 64 + return typeof arg === 'object' && arg !== null && (arg as any).__stream__ === true; 65 + } 66 + 36 67 export interface PipeOptions { 37 - /** How many emits without a natural event-loop tick before forcing a yield. Defaults to 16. Unused when `flags` is provided. */ 38 - yieldEvery?: number; 39 - /** Target in-flight count before the producer parks on the `flags.inflight` atomic. Defaults to 16. */ 40 - highWater?: number; 41 68 /** 42 69 * When true, call extractTransferables on each value to pull out any 43 70 * `transfer(buf)` markers into the transfer list. Set on the parent side 44 - * where user code may wrap values; off on the worker side where values 45 - * come from a user generator and don't need this processing. 71 + * where user code may wrap values; off on the worker side. 46 72 */ 47 73 extractTransfers?: boolean; 48 - /** 49 - * When provided, pipe uses atomics-based backpressure: the producer parks 50 - * on `flags.inflight` when it reaches `highWater`, the consumer 51 - * decrements + notifies on pull, and cancellation is signalled by writing 52 - * `CANCEL` to `flags.state`. Eliminates pause/resume messages and the 53 - * adaptive-yield overhead. 54 - */ 55 - flags?: PipeFlags; 56 74 } 57 75 58 76 /** 59 - * Pumps values from an async iterable to a MessagePort. Owns pause/resume 60 - * wiring, cancellation, serialization, and transferable collection. With 61 - * `flags` provided, uses atomics-based backpressure (tight, no per-item 62 - * yield); without, falls back to pause/resume messages with an adaptive 63 - * yield policy. 77 + * Atomics-based pipe. Pumps values from `source` onto `handle.port`, 78 + * parking on `flags.inflight` when it reaches `handle.highWater`. 64 79 */ 65 - export async function pipeIterable<T>(source: AsyncIterable<T>, port: MessagePort, opts: PipeOptions = {}): Promise<void> { 66 - if (opts.flags) return pipeWithAtomics(source, port, opts.flags, opts); 67 - return pipeWithMessages(source, port, opts); 68 - } 69 - 70 - async function pipeWithAtomics<T>( 80 + export async function pipeIterable<T>( 71 81 source: AsyncIterable<T>, 72 - port: MessagePort, 73 - flags: PipeFlags, 74 - opts: PipeOptions, 82 + handle: StreamHandle, 83 + opts: PipeOptions = {}, 75 84 ): Promise<void> { 76 - const highWater = opts.highWater ?? 16; 77 85 const extract = opts.extractTransfers ?? false; 78 - const { inflight, state } = flags; 86 + const { port, flags, highWater } = handle; 87 + const inflight = flags.fields.inflight; 88 + const state = flags.fields.state; 79 89 80 90 try { 81 91 for await (const value of source) { ··· 96 106 97 107 const newInflight = inflight.add(1) + 1; 98 108 if (newInflight >= highWater) { 99 - // Park until consumer drains below highWater or we're cancelled. 100 109 while (true) { 101 110 if (state.load() === CANCEL) return; 102 111 const current = inflight.load(); ··· 116 125 } catch {} 117 126 } 118 127 119 - async function pipeWithMessages<T>(source: AsyncIterable<T>, port: MessagePort, opts: PipeOptions): Promise<void> { 120 - const yieldEvery = opts.yieldEvery ?? 16; 128 + /** 129 + * Legacy message-based pipe (pause/resume + adaptive setImmediate). Used 130 + * for paths that haven't migrated to atomics — primarily the raw 131 + * AsyncGenerator arg path through pipeToPort. 132 + */ 133 + export async function pipeIterableLegacy<T>( 134 + source: AsyncIterable<T>, 135 + port: MessagePort, 136 + opts: { yieldEvery?: number; extractTransfers?: boolean } = {}, 137 + ): Promise<void> { 138 + const yieldEvery = opts.yieldEvery ?? DEFAULT_HIGH_WATER; 121 139 const extract = opts.extractTransfers ?? false; 122 140 123 141 let paused = false; ··· 198 216 port.close(); 199 217 } catch {} 200 218 } 201 - 202 - export { CANCEL, RUN };
+36 -41
src/worker-entry.ts
··· 3 3 import { registry } from './registry.ts'; 4 4 import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5 5 import { collectTransferables } from './transfer.ts'; 6 - import { pipeIterable, pipeFlagsFromBuffer, CANCEL } from './pipe.ts'; 7 - import type { PipeFlags } from './pipe.ts'; 8 - import { Int32Atomic } from './shared/int32-atomic.ts'; 6 + import { 7 + pipeIterable, 8 + CANCEL, 9 + DEFAULT_HIGH_WATER, 10 + deserializeStreamHandle, 11 + isSerializedStreamHandle, 12 + } from './pipe.ts'; 13 + import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; 9 14 10 15 const imported = new Set<string>(); 11 16 const taskCache = new Map<number, unknown>(); ··· 17 22 return typeof arg === 'object' && arg !== null && '__task__' in arg; 18 23 } 19 24 20 - function isStreamArg(arg: unknown): arg is { 21 - __stream__: true; 22 - port: MessagePort; 23 - flags: SharedArrayBuffer; 24 - readySignal?: SharedArrayBuffer; 25 - highWater?: number; 26 - } { 27 - return typeof arg === 'object' && arg !== null && (arg as any).__stream__ === true; 28 - } 29 - 30 25 function needsAsyncResolve(arg: unknown): boolean { 31 - return arg instanceof MessagePort || isStreamArg(arg) || isTaskArg(arg); 26 + return arg instanceof MessagePort || isSerializedStreamHandle(arg) || isTaskArg(arg); 32 27 } 33 28 34 29 // Returns the function synchronously when cached — callers must not await ··· 51 46 return fn; 52 47 } 53 48 54 - function portToAsyncIterable<T>( 55 - port: MessagePort, 56 - flags?: PipeFlags, 57 - readySignal?: Int32Atomic, 58 - highWater: number = 16, 59 - ): AsyncIterable<T> { 49 + function portToAsyncIterable<T>(arg: MessagePort | StreamHandle): AsyncIterable<T> { 50 + const handle: StreamHandle | null = isHandle(arg) ? arg : null; 51 + const port: MessagePort = handle ? handle.port : (arg as MessagePort); 52 + const highWater = handle?.highWater ?? DEFAULT_HIGH_WATER; 53 + const inflight = handle?.flags.fields.inflight; 54 + const state = handle?.flags.fields.state; 55 + const readySignal = handle?.readySignal; 56 + 60 57 const queue: T[] = []; 61 58 let done = false; 62 59 let error: Error | null = null; 63 60 let waiting: (() => void) | null = null; 64 61 65 - // Legacy message-based backpressure (when flags is absent). 62 + // Legacy message-based backpressure (when handle is null). 66 63 let paused = false; 67 64 68 65 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { ··· 88 85 waiting(); 89 86 waiting = null; 90 87 } 91 - if (!flags && !paused && queue.length >= highWater) { 88 + if (!handle && !paused && queue.length >= highWater) { 92 89 paused = true; 93 90 port.postMessage('pause'); 94 91 } ··· 101 98 while (true) { 102 99 if (queue.length > 0) { 103 100 const value = queue.shift()!; 104 - if (flags) { 101 + if (inflight) { 105 102 // Atomics-based backpressure: decrement inflight. In stream 106 103 // mode the producer parks on inflight, so we notify it; 107 104 // in channel mode the producer parks on readySignal, and 108 105 // we only wake it on the cap→below-cap transition. 109 - const prev = flags.inflight.sub(1); 106 + const prev = inflight.sub(1); 110 107 if (readySignal) { 111 108 if (prev === highWater) { 112 109 readySignal.add(1); 113 110 readySignal.notify(); 114 111 } 115 112 } else { 116 - flags.inflight.notify(); 113 + inflight.notify(); 117 114 } 118 115 } else if (paused && queue.length <= 1) { 119 116 paused = false; ··· 129 126 } 130 127 }, 131 128 async return(): Promise<IteratorResult<T>> { 132 - if (flags) { 133 - flags.state.store(CANCEL); 134 - flags.inflight.notify(); 129 + if (state && inflight) { 130 + state.store(CANCEL); 131 + inflight.notify(); 135 132 if (readySignal) { 136 133 readySignal.add(1); 137 134 readySignal.notify(); ··· 145 142 }; 146 143 } 147 144 145 + function isHandle(arg: MessagePort | StreamHandle): arg is StreamHandle { 146 + return typeof arg === 'object' && arg !== null && 'flags' in arg; 147 + } 148 + 148 149 async function resolveArg(arg: unknown): Promise<unknown> { 149 - if (isStreamArg(arg)) { 150 - const flags = pipeFlagsFromBuffer(arg.flags); 151 - const readySignal = arg.readySignal ? new Int32Atomic(arg.readySignal, 0) : undefined; 152 - return portToAsyncIterable(arg.port, flags, readySignal, arg.highWater); 150 + if (isSerializedStreamHandle(arg)) { 151 + return portToAsyncIterable(deserializeStreamHandle(arg)); 153 152 } 154 153 if (arg instanceof MessagePort) { 155 154 return portToAsyncIterable(arg); ··· 178 177 callId?: number; 179 178 id: string; 180 179 args: unknown[]; 181 - port?: MessagePort; 182 - flags?: SharedArrayBuffer; 183 - highWater?: number; 180 + /** Present on streaming dispatches; bundles port + flags + highWater. */ 181 + stream?: SerializedStreamHandle; 184 182 }; 185 183 186 184 function postResult(callId: number, value: unknown): void { ··· 261 259 } 262 260 263 261 async function handleStreamTask(msg: TaskMsg): Promise<void> { 264 - const { id, args, port, flags } = msg; 262 + const { id, args } = msg; 265 263 const callId = msg.callId; 266 264 try { 267 265 const fnM = resolveFn(id); ··· 272 270 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 273 271 } 274 272 const gen = fn(...resolvedArgs) as AsyncGenerator; 275 - await pipeIterable(gen, port!, { 276 - flags: flags ? pipeFlagsFromBuffer(flags) : undefined, 277 - highWater: msg.highWater, 278 - }); 273 + await pipeIterable(gen, deserializeStreamHandle(msg.stream!)); 279 274 } catch (err) { 280 275 if (callId != null) postError(callId, err); 281 276 } 282 277 } 283 278 284 279 parentPort!.on('message', (msg: TaskMsg) => { 285 - if (msg.port) { 280 + if (msg.stream) { 286 281 void handleStreamTask(msg); 287 282 } else { 288 283 handleValueTask(msg);