Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: review cleanups, backpressure tests, changeset

- Remove unused RUN export from pipe.ts
- Remove unused CHANNEL import from execute.ts
- Generalize ChannelOptions.highWaterMark tsdoc (streams + channels)
- README: add Int32Atomic waitAsync/notify section; note atomics backpressure
- Add backpressure tests verifying producer actually stalls when consumer
is slow — peak (emitted - consumed) gap stays within highWater + slop
- Add .changeset/stream-atomics.md (minor bump)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+139 -20
+32
.changeset/stream-atomics.md
··· 1 + --- 2 + 'moroutine': minor 3 + --- 4 + 5 + Atomics-based backpressure for streams and channels; new `Int32Atomic#waitAsync` / `#notify` 6 + 7 + **New public API on `Int32Atomic`**: 8 + 9 + ```ts 10 + const slot = int32atomic(); 11 + // Park until the slot holds a value other than `expected`, with optional timeout. 12 + await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out' 13 + await slot.waitAsync(0, 100 /*ms*/); 14 + // Wake waiters on this slot. 15 + slot.notify(); // wake all 16 + slot.notify(2); // wake at most 2 17 + ``` 18 + 19 + Matches `Atomics.waitAsync` / `Atomics.notify` semantics; returns `'not-equal'` synchronously (no microtask hop) if the slot already holds something different from `expected`. 20 + 21 + **Streams and channels use atomics for backpressure**: streaming tasks and `channel()` no longer rely on `pause`/`resume` port messages for flow control. Internally, a `SharedArrayBuffer` tracks per-endpoint `inflight` and `state`; the producer parks on `Atomics.waitAsync` when the `highWaterMark` cap is reached and resumes when the consumer drains below it. Behavior-equivalent and much tighter under backpressure (worst-case buffering drops from ~10-20× `highWaterMark` to `highWaterMark + 1-2`). 22 + 23 + **`highWaterMark` option now honored** for both `run(streamTask, { highWaterMark })` and `channel(src, { highWaterMark })`. Previously read and discarded in the streaming path. 24 + 25 + **Significant throughput improvements** on streaming and channel fan-out: 26 + 27 + - Worker → parent stream: ~66K items/s → ~750K items/s (11×) 28 + - Round-trip stream: ~39K items/s → ~425K items/s (11×) 29 + - Channel fan-out at 8 consumers: ~412K items/s → ~546K items/s (+33%) 30 + - Per-task dispatch: ~289K → ~328K (+14%) 31 + 32 + No changes to user-facing API shape beyond the `Int32Atomic` additions.
+19 -1
README.md
··· 222 222 223 223 Full atomic operations: `load`, `store`, `add`, `sub`, `and`, `or`, `xor`, `exchange`, `compareExchange`. 224 224 225 + `Int32Atomic` additionally exposes futex-style wait / wake: 226 + 227 + ```ts 228 + const slot = int32atomic(); 229 + 230 + // In one thread — wait until the slot's value differs from 0, 231 + // with an optional timeout. 232 + await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out' 233 + await slot.waitAsync(0, 100 /*ms*/); // with timeout 234 + 235 + // In another thread — change the value and wake waiters. 236 + slot.store(1); 237 + slot.notify(); // wakes all waiters on this slot 238 + slot.notify(2); // wakes at most 2 239 + ``` 240 + 241 + `waitAsync` returns `'not-equal'` synchronously (no microtask hop) if the slot already holds a value different from `expected`. The underlying `Atomics.wait`/`notify` futex is available only on `Int32Atomic`. 242 + 225 243 #### Structs 226 244 227 245 Plain objects in `shared()` create structs backed by a single `SharedArrayBuffer`. ··· 344 362 345 363 ### Streaming Moroutines 346 364 347 - Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with pause/resume backpressure. 365 + Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with atomics-based backpressure — the producer parks when `highWaterMark` items (default 16) sit in flight and resumes when the consumer drains below the cap. 348 366 349 367 ```ts 350 368 // count.ts
+12 -4
examples/benchmark-dispatch/main.ts
··· 127 127 128 128 console.log('ping-pong latency (strict await-each, 1 worker):'); 129 129 const pp = await pingPong(SEQ); 130 - console.log(` ${SEQ.toLocaleString()} tasks ${Math.round(pp).toLocaleString().padStart(9)} ops/s (${(1e6 / pp).toFixed(1)} µs/roundtrip)`); 130 + console.log( 131 + ` ${SEQ.toLocaleString()} tasks ${Math.round(pp).toLocaleString().padStart(9)} ops/s (${(1e6 / pp).toFixed(1)} µs/roundtrip)`, 132 + ); 131 133 132 134 console.log('\npipelined throughput (1 worker, in-flight window):'); 133 135 for (const window of [4, 16, 64, 256]) { ··· 145 147 const str1 = await streamingOneWay(PAR); 146 148 const ch1 = await channelFanout(PAR, 1); 147 149 const strRT = await streaming(PAR); 148 - console.log(` stream (worker generates → parent consumes) ${Math.round(str1).toLocaleString().padStart(9)} items/s`); 149 - console.log(` channel (parent generates → worker consumes) ${Math.round(ch1).toLocaleString().padStart(9)} items/s`); 150 - console.log(` round-trip stream (parent → worker → parent, 2 hops) ${Math.round(strRT).toLocaleString().padStart(9)} items/s`); 150 + console.log( 151 + ` stream (worker generates → parent consumes) ${Math.round(str1).toLocaleString().padStart(9)} items/s`, 152 + ); 153 + console.log( 154 + ` channel (parent generates → worker consumes) ${Math.round(ch1).toLocaleString().padStart(9)} items/s`, 155 + ); 156 + console.log( 157 + ` round-trip stream (parent → worker → parent, 2 hops) ${Math.round(strRT).toLocaleString().padStart(9)} items/s`, 158 + ); 151 159 152 160 console.log('\nbatch vs stream vs per-task (1 worker, 100K items):'); 153 161 const perTask = await parallel(PAR, 1);
+9 -3
src/channel.ts
··· 10 10 11 11 const CHANNEL = Symbol.for('moroutine.channel'); 12 12 13 - /** Options for configuring a channel. */ 13 + /** Options for configuring a streaming dispatch or channel. */ 14 14 export interface ChannelOptions { 15 - /** Max items any one consumer can have in-flight before the distributor 16 - * skips it and gives items to other ready consumers. Defaults to 16. */ 15 + /** 16 + * Maximum number of in-flight items on a stream before the producer parks. 17 + * For a streaming task (`run(streamTask)`), this is how many items the 18 + * worker may have emitted ahead of the parent's consumption. For a 19 + * `channel()`, it's the per-consumer cap — the distributor skips any 20 + * consumer at cap and gives items to another ready consumer. 21 + * Defaults to 16. 22 + */ 17 23 highWaterMark?: number; 18 24 } 19 25
+1 -1
src/execute.ts
··· 8 8 import { PromiseLikeTask } from './task.ts'; 9 9 import { AsyncIterableTask } from './stream-task.ts'; 10 10 import { runStreamOnDedicated } from './dedicated-runner.ts'; 11 - import { CHANNEL, Channel } from './channel.ts'; 11 + import { Channel } from './channel.ts'; 12 12 import { pipeIterable, newPipeFlags, CANCEL, DEFAULT_HIGH_WATER, serializeStreamHandle } from './pipe.ts'; 13 13 import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; 14 14 import type { ChannelOptions } from './channel.ts';
-2
src/pipe.ts
··· 6 6 import type { Int32Atomic } from './shared/int32-atomic.ts'; 7 7 import type { SharedStruct } from './shared/shared-struct.ts'; 8 8 9 - export const RUN = 0; 10 9 export const CANCEL = 1; 11 10 export const DEFAULT_HIGH_WATER = 16; 12 11 ··· 124 123 port.close(); 125 124 } catch {} 126 125 } 127 -
+1 -7
src/worker-entry.ts
··· 3 3 import { registry } from './registry.ts'; 4 4 import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5 5 import { collectTransferables } from './transfer.ts'; 6 - import { 7 - pipeIterable, 8 - CANCEL, 9 - DEFAULT_HIGH_WATER, 10 - deserializeStreamHandle, 11 - isSerializedStreamHandle, 12 - } from './pipe.ts'; 6 + import { pipeIterable, CANCEL, DEFAULT_HIGH_WATER, deserializeStreamHandle, isSerializedStreamHandle } from './pipe.ts'; 13 7 import type { StreamHandle, SerializedStreamHandle } from './pipe.ts'; 14 8 15 9 const imported = new Set<string>();
+30
test/fixtures/backpressure.ts
··· 1 + import { setTimeout as sleep } from 'node:timers/promises'; 2 + import { mo } from 'moroutine'; 3 + import type { Int32Atomic } from 'moroutine'; 4 + 5 + // Worker-side generator that yields [0..n) and bumps `emitted` after each 6 + // yield resumes — i.e. after the value has been accepted by the pipe. Used 7 + // to observe producer-side backpressure from the parent. 8 + export const countingGen = mo(import.meta, async function* (n: number, emitted: Int32Atomic) { 9 + for (let i = 0; i < n; i++) { 10 + yield i; 11 + emitted.add(1); 12 + } 13 + }); 14 + 15 + // Worker-side consumer that drains an AsyncIterable slowly and records the 16 + // peak observed `emitted - consumed` gap. Returns the peak. 17 + export const slowSumPeakGap = mo( 18 + import.meta, 19 + async (input: AsyncIterable<number>, emitted: Int32Atomic, delayMs: number): Promise<number> => { 20 + let consumed = 0; 21 + let peak = 0; 22 + for await (const _ of input) { 23 + consumed++; 24 + const gap = emitted.load() - consumed; 25 + if (gap > peak) peak = gap; 26 + await sleep(delayMs); 27 + } 28 + return peak; 29 + }, 30 + );
+16 -1
test/stream-input.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workers, channel } from 'moroutine'; 3 + import { workers, channel, int32atomic } from 'moroutine'; 4 4 import { sumStream, uppercaseStream } from './fixtures/stream-input.ts'; 5 + import { slowSumPeakGap } from './fixtures/backpressure.ts'; 5 6 6 7 describe('channel() input (main -> worker)', () => { 7 8 it('pipes async iterable to worker', async () => { ··· 49 50 } finally { 50 51 run[Symbol.dispose](); 51 52 } 53 + }); 54 + 55 + it('honors channel highWaterMark: parent producer stalls for slow worker', async () => { 56 + using run = workers(1); 57 + const highWaterMark = 4; 58 + const emitted = int32atomic(); 59 + async function* source() { 60 + for (let i = 0; i < 50; i++) { 61 + yield i; 62 + emitted.add(1); 63 + } 64 + } 65 + const peak = await run(slowSumPeakGap(channel(source(), { highWaterMark }), emitted, 2)); 66 + assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`); 52 67 }); 53 68 });
+19 -1
test/stream.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 + import { setTimeout as sleep } from 'node:timers/promises'; 2 3 import assert from 'node:assert/strict'; 3 - import { workers } from 'moroutine'; 4 + import { workers, int32atomic } from 'moroutine'; 4 5 import { countUp, failAfter } from './fixtures/stream-gen.ts'; 6 + import { countingGen } from './fixtures/backpressure.ts'; 5 7 6 8 describe('streaming moroutines (worker -> main)', () => { 7 9 it('iterates yielded values from worker', async () => { ··· 68 70 } finally { 69 71 run[Symbol.dispose](); 70 72 } 73 + }); 74 + 75 + it('honors highWaterMark: slow consumer stalls the producer', async () => { 76 + using run = workers(1); 77 + const highWaterMark = 4; 78 + const emitted = int32atomic(); 79 + let consumed = 0; 80 + let peak = 0; 81 + for await (const _ of run(countingGen(50, emitted), { highWaterMark })) { 82 + consumed++; 83 + const gap = emitted.load() - consumed; 84 + if (gap > peak) peak = gap; 85 + await sleep(2); 86 + } 87 + // With highWater=4, worst-case observable gap is ~highWater + 2 slop. 88 + assert.ok(peak <= highWaterMark + 3, `expected peak gap <= ${highWaterMark + 3}, got ${peak}`); 71 89 }); 72 90 });