Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: use Int32Atomic for pipe backpressure; add waitAsync/notify

Dogfood moroutine's own shared-memory primitives for the stream
backpressure path instead of reinventing the Atomics layer.

- Add `waitAsync(expected, timeoutMs?)` and `notify(count?)` methods
to Int32Atomic, wrapping `Atomics.waitAsync` / `Atomics.notify`.
Returns 'ok' | 'not-equal' | 'timed-out' from the wait. 5 new tests.
- Replace raw Int32Array + Atomics.* calls in pipe.ts with Int32Atomic
method calls. A PipeFlags struct holds two Int32Atomic views
(inflight at offset 0, state at offset 4) over a single 8-byte SAB.
- Expose newPipeFlags() / pipeFlagsFromBuffer() so callers only work
with typed wrappers; the raw SharedArrayBuffer is passed over the
dispatch message edge but both sides reconstruct PipeFlags from it.

Behavior unchanged — same throughput (~450K/s) and backpressure
tightness (~89ms steady latency with 5ms-per-item consumer).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+115 -35
+9 -10
src/execute.ts
··· 9 9 import { AsyncIterableTask } from './stream-task.ts'; 10 10 import { runStreamOnDedicated } from './dedicated-runner.ts'; 11 11 import { CHANNEL, Channel } from './channel.ts'; 12 - import { pipeIterable, FLAGS_BYTE_LEN, INFLIGHT, STATE, CANCEL } from './pipe.ts'; 12 + import { pipeIterable, newPipeFlags, CANCEL } from './pipe.ts'; 13 13 import type { ChannelOptions } from './channel.ts'; 14 14 15 15 let nextCallId = 0; ··· 108 108 109 109 const { port1, port2 } = new MessageChannel(); 110 110 // Atomics-based backpressure: shared inflight count + cancel flag. 111 - // Worker parks on INFLIGHT when it hits highWater; parent decrements 112 - // on pull and notifies. Replaces pause/resume messages and the 111 + // Worker parks on flags.inflight when it hits highWater; parent decrements 112 + // + notifies on pull. Replaces pause/resume messages and the 113 113 // adaptive-yield setImmediate dance. 114 - const flagsBuf = new SharedArrayBuffer(FLAGS_BYTE_LEN); 115 - const flags = new Int32Array(flagsBuf); 114 + const flags = newPipeFlags(); 116 115 117 116 const extracted = extractTransferables(args); 118 117 streamPortStack.push([]); 119 118 const preparedArgs = extracted.args.map(prepareArg); 120 119 const ports = streamPortStack.pop()!; 121 - const msg = { id, args: preparedArgs, port: port2, flags: flagsBuf }; 120 + const msg = { id, args: preparedArgs, port: port2, flags: flags.buffer }; 122 121 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 123 122 124 123 let resolveDone: () => void; ··· 170 169 if (queue.length > 0) { 171 170 const value = queue.shift()!; 172 171 // Signal consumption to producer 173 - Atomics.sub(flags, INFLIGHT, 1); 174 - Atomics.notify(flags, INFLIGHT); 172 + flags.inflight.sub(1); 173 + flags.inflight.notify(); 175 174 return { done: false, value }; 176 175 } 177 176 if (error) throw error; ··· 182 181 } 183 182 }, 184 183 async return(): Promise<IteratorResult<T>> { 185 - Atomics.store(flags, STATE, CANCEL); 186 - Atomics.notify(flags, INFLIGHT); 184 + flags.state.store(CANCEL); 185 + flags.inflight.notify(); 187 186 port1.close(); 188 187 resolveDone!(); 189 188 return { done: true, value: undefined };
+49 -22
src/pipe.ts
··· 1 1 import type { MessagePort, Transferable } from 'node:worker_threads'; 2 2 import { serializeArg } from './shared/reconstruct.ts'; 3 3 import { collectTransferables, extractTransferables } from './transfer.ts'; 4 + import { Int32Atomic } from './shared/int32-atomic.ts'; 4 5 5 - export const INFLIGHT = 0; 6 - export const STATE = 1; 7 - export const RUN = 0; 8 - export const CANCEL = 1; 9 - export const FLAGS_BYTE_LEN = 2 * Int32Array.BYTES_PER_ELEMENT; 6 + const RUN = 0; 7 + const CANCEL = 1; 8 + const FLAGS_BYTE_LEN = 8; 9 + const INFLIGHT_OFFSET = 0; 10 + const STATE_OFFSET = 4; 11 + 12 + export interface PipeFlags { 13 + inflight: Int32Atomic; 14 + state: Int32Atomic; 15 + /** Backing SharedArrayBuffer — transport via postMessage, reconstruct with {@link pipeFlagsFromBuffer} on the other side. */ 16 + buffer: SharedArrayBuffer; 17 + } 18 + 19 + export function newPipeFlags(): PipeFlags { 20 + const buffer = new SharedArrayBuffer(FLAGS_BYTE_LEN); 21 + return { 22 + inflight: new Int32Atomic(buffer, INFLIGHT_OFFSET), 23 + state: new Int32Atomic(buffer, STATE_OFFSET), 24 + buffer, 25 + }; 26 + } 27 + 28 + export function pipeFlagsFromBuffer(buffer: SharedArrayBuffer): PipeFlags { 29 + return { 30 + inflight: new Int32Atomic(buffer, INFLIGHT_OFFSET), 31 + state: new Int32Atomic(buffer, STATE_OFFSET), 32 + buffer, 33 + }; 34 + } 10 35 11 36 export interface PipeOptions { 12 37 /** How many emits without a natural event-loop tick before forcing a yield. Defaults to 16. Unused when `flags` is provided. */ 13 38 yieldEvery?: number; 14 - /** Target in-flight count before the producer parks on Atomics. Defaults to 16. Used only with `flags`. */ 39 + /** Target in-flight count before the producer parks on the `flags.inflight` atomic. Defaults to 16. */ 15 40 highWater?: number; 16 41 /** 17 42 * When true, call extractTransferables on each value to pull out any ··· 21 46 */ 22 47 extractTransfers?: boolean; 23 48 /** 24 - * When provided, pipe uses an atomics-based backpressure protocol: 25 - * the producer parks on `INFLIGHT` when it reaches `highWater`, the 26 - * consumer increments/decrements via `Atomics.sub`+`notify` on pull, 27 - * and cancellation is signalled by writing `CANCEL` to the `STATE` slot. 28 - * Eliminates pause/resume messages and the adaptive-yield overhead. 49 + * When provided, pipe uses atomics-based backpressure: the producer parks 50 + * on `flags.inflight` when it reaches `highWater`, the consumer 51 + * decrements + notifies on pull, and cancellation is signalled by writing 52 + * `CANCEL` to `flags.state`. Eliminates pause/resume messages and the 53 + * adaptive-yield overhead. 29 54 */ 30 - flags?: Int32Array; 55 + flags?: PipeFlags; 31 56 } 32 57 33 58 /** ··· 45 70 async function pipeWithAtomics<T>( 46 71 source: AsyncIterable<T>, 47 72 port: MessagePort, 48 - flags: Int32Array, 73 + flags: PipeFlags, 49 74 opts: PipeOptions, 50 75 ): Promise<void> { 51 76 const highWater = opts.highWater ?? 16; 52 77 const extract = opts.extractTransfers ?? false; 78 + const { inflight, state } = flags; 53 79 54 80 try { 55 81 for await (const value of source) { 56 - if (Atomics.load(flags, STATE) === CANCEL) return; 82 + if (state.load() === CANCEL) return; 57 83 58 84 let serialized: unknown; 59 85 const transferList: Transferable[] = []; ··· 68 94 } 69 95 port.postMessage({ value: serialized, done: false }, transferList); 70 96 71 - const newInflight = Atomics.add(flags, INFLIGHT, 1) + 1; 97 + const newInflight = inflight.add(1) + 1; 72 98 if (newInflight >= highWater) { 73 - // Park until parent drains below highWater or we're cancelled. 99 + // Park until consumer drains below highWater or we're cancelled. 74 100 while (true) { 75 - if (Atomics.load(flags, STATE) === CANCEL) return; 76 - const current = Atomics.load(flags, INFLIGHT); 101 + if (state.load() === CANCEL) return; 102 + const current = inflight.load(); 77 103 if (current < highWater) break; 78 - const { async, value: p } = Atomics.waitAsync(flags, INFLIGHT, current); 79 - if (async) await p; 104 + await inflight.waitAsync(current); 80 105 } 81 106 } 82 107 } 83 - if (Atomics.load(flags, STATE) !== CANCEL) port.postMessage({ done: true }); 108 + if (state.load() !== CANCEL) port.postMessage({ done: true }); 84 109 } catch (err) { 85 - if (Atomics.load(flags, STATE) !== CANCEL) { 110 + if (state.load() !== CANCEL) { 86 111 port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 87 112 } 88 113 } ··· 173 198 port.close(); 174 199 } catch {} 175 200 } 201 + 202 + export { CANCEL, RUN };
+20
src/shared/int32-atomic.ts
··· 47 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 48 48 } 49 49 50 + /** 51 + * Parks asynchronously until the slot's value differs from `expected` or the optional timeout elapses. 52 + * Returns `'not-equal'` synchronously (no await) if the slot already holds a different value. 53 + * @param expected - Value to wait against; wake when the slot's value is observed to differ. 54 + * @param timeoutMs - Optional timeout in milliseconds. Defaults to Infinity. 55 + * @returns `'ok'` if woken by a notify, `'not-equal'` if the value already differed, `'timed-out'` if the timeout elapsed. 56 + */ 57 + async waitAsync(expected: number, timeoutMs?: number): Promise<'ok' | 'not-equal' | 'timed-out'> { 58 + const r = Atomics.waitAsync(this.view, 0, expected, timeoutMs); 59 + return r.async ? await r.value : r.value; 60 + } 61 + 62 + /** 63 + * Wakes up to `count` waiters on this slot. Returns the number actually woken. 64 + * @param count - Maximum number of waiters to wake. Defaults to Infinity. 65 + */ 66 + notify(count: number = Infinity): number { 67 + return Atomics.notify(this.view, 0, count); 68 + } 69 + 50 70 [Symbol.for('moroutine.shared')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 71 return { tag: 'Int32Atomic', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 72 }
+2 -3
src/worker-entry.ts
··· 3 3 import { registry } from './registry.ts'; 4 4 import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5 5 import { collectTransferables } from './transfer.ts'; 6 - import { pipeIterable } from './pipe.ts'; 6 + import { pipeIterable, pipeFlagsFromBuffer } from './pipe.ts'; 7 7 8 8 const imported = new Set<string>(); 9 9 const taskCache = new Map<number, unknown>(); ··· 221 221 resolvedArgs[i] = needsAsyncResolve(a) ? await resolveArg(a) : deserializeArg(a); 222 222 } 223 223 const gen = fn(...resolvedArgs) as AsyncGenerator; 224 - const flagsView = flags ? new Int32Array(flags) : undefined; 225 - await pipeIterable(gen, port!, { flags: flagsView }); 224 + await pipeIterable(gen, port!, { flags: flags ? pipeFlagsFromBuffer(flags) : undefined }); 226 225 } catch (err) { 227 226 if (callId != null) postError(callId, err); 228 227 }
+35
test/shared/int32-atomic.test.ts
··· 81 81 assert.equal(actual, 42); 82 82 assert.equal(a.load(), 42); 83 83 }); 84 + 85 + it('waitAsync returns "not-equal" synchronously when value already differs', async () => { 86 + const a = int32atomic(); 87 + a.store(5); 88 + const result = await a.waitAsync(0); 89 + assert.equal(result, 'not-equal'); 90 + }); 91 + 92 + it('waitAsync resolves "ok" when a concurrent store + notify happens', async () => { 93 + const a = int32atomic(); 94 + const wait = a.waitAsync(0); 95 + setImmediate(() => { 96 + a.store(1); 97 + a.notify(); 98 + }); 99 + assert.equal(await wait, 'ok'); 100 + assert.equal(a.load(), 1); 101 + }); 102 + 103 + it('waitAsync resolves "timed-out" when no notify arrives', async () => { 104 + const a = int32atomic(); 105 + const result = await a.waitAsync(0, 20); 106 + assert.equal(result, 'timed-out'); 107 + }); 108 + 109 + it('notify returns count of waiters woken', async () => { 110 + const a = int32atomic(); 111 + const w1 = a.waitAsync(0); 112 + const w2 = a.waitAsync(0); 113 + const woken = a.notify(); 114 + a.store(1); 115 + a.notify(); 116 + assert.equal(woken + a.notify(), 2); 117 + await Promise.all([w1, w2]); 118 + }); 84 119 });