commits
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
serializeStructField had a SharedStruct branch but no Tuple branch, so a
Tuple used as a SharedStruct field (or as a Tuple element) fell through
to the primitive-shape branch and lost its `elements`. Worker-side
deserialization then crashed on `data.elements.map(...)` with
`Cannot read properties of undefined (reading 'map')`. Mirror the
top-level serializeArg Tuple case in the recursive helper so nested
tuples carry their elements across.
Adds cross-worker regression tests for struct-with-tuple-of-primitives,
struct-with-tuple-of-structs, and tuple-of-tuples.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously findReadyConsumer scanned from index 0 and picked the first
below-cap consumer, so consumer 0 won nearly every tie and the first
worker received a disproportionate share of items. Now the scan starts
from a rotating cursor, giving every below-cap consumer a turn.
Skip-based RR semantics are preserved — consumers at cap are still
skipped so a saturated worker doesn't stall the pipeline. Adds a
dedicated benchmark (examples/benchmark-dispatch/fanout.ts) that
measures distribution spread at different volumes and under
backpressure; tightens the fan-out tests to use the
run.workers.map(assign) pattern so results don't depend on the pool's
default balancer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Remove unused RUN export from pipe.ts
- Remove unused CHANNEL import from execute.ts
- Generalize ChannelOptions.highWaterMark tsdoc (streams + channels)
- README: add Int32Atomic waitAsync/notify section; note atomics backpressure
- Add backpressure tests verifying producer actually stalls when consumer
is slow — peak (emitted - consumed) gap stays within highWater + slop
- Add .changeset/stream-atomics.md (minor bump)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove pipeIterableLegacy, the pipeToPort wrapper, and portToAsyncIterable's
message-based branch. AsyncGenerator and AsyncIterableTask args now produce
a serialized StreamHandle the same way Channel does, and the worker
receives one via portToAsyncIterable's atomics path.
All stream-arg producers now use pipeIterable with a StreamHandle; the
worker always receives a StreamHandle. One code path, one data shape,
no more "legacy vs atomics" branches.
~150 lines removed. Tests 332/332. No bench change observed — the
round-trip stream bottleneck moved to the worker-side passthrough
generator, not the transport. Correctness/consistency win, not a perf
win.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Unify how "one streaming endpoint" is represented and transported:
- PipeFlags is now a SharedStruct<{inflight, state}> allocated via
moroutine's own shared({...}) primitive. No more hand-rolled byte
offsets or manual Int32Atomic construction over a raw SAB.
- StreamHandle is a first-class object carrying {port, flags,
readySignal?, highWater} — the complete description of one end of
a backpressured stream.
- serializeStreamHandle / deserializeStreamHandle / isSerializedStreamHandle
round-trip a handle through postMessage using the existing
serializeArg/deserializeArg machinery. No custom "rebuild from
buffer" helper — the shared-primitive layer already does this.
Callers no longer juggle free-floating {port, flags: SAB, readySignal:
SAB, highWater: number}; they pass a StreamHandle. dispatchStream's
message now has a single `stream` field; Channel.addConsumer returns
a StreamHandle directly; prepareArg serializes it for transport.
Behavior and throughput unchanged. 332/332 tests pass; channel
fan-out and stream numbers within noise of pre-refactor.
Renamed: moved the old "legacy" pause/resume + adaptive-yield pipe
to pipeIterableLegacy (still used by pipeToPort for AsyncGenerator
args that haven't migrated to atomics).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
channel(source, { highWaterMark }) now plumbs the threshold through:
- Channel stores it from opts (default 16)
- findReadyConsumer uses it for the cap check
- addConsumer returns it in the per-consumer handle
- prepareArg forwards it in the {__stream__, ...} arg
- Worker portToAsyncIterable receives it; uses it in the cap→below-cap
transition check that signals readySignal
- Legacy message-based path uses it for the pause-threshold too
Two sides both need it (producer scans against it; consumer detects
the transition to signal readiness), so it travels with the arg.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dispatchStream was reading opts.highWaterMark into a local but never
forwarding it to the worker. Worker-side pipeIterable was defaulting
to its own hardcoded 16, so user-supplied highWaterMark had no effect
after the atomics refactor.
Include highWater in the dispatch message; handleStreamTask passes it
to pipeIterable's flags-branch so the producer parks at the requested
threshold.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Hybrid design: keep the single tryPull loop on the parent (no
contention on source.next()) but replace message-based pause/resume
with atomics.
- Each consumer has its own PipeFlags (inflight + state); consumer-
side portToAsyncIterable decrements inflight on pull.
- A single Int32Atomic readySignal is shared across all consumers of
the channel. Consumers bump readySignal on the cap→below-cap
transition (only when it matters, to avoid spurious distributor
wakeups). The distributor parks on readySignal when every consumer
is at cap.
- Stream-arg shape extended: {__stream__, port, flags, readySignal?}.
Channel sets readySignal; pipeToPort / worker-generator streams
don't (they use inflight directly).
Tradeoff vs the previous message-based adaptive-yield Distributor:
before after delta
1 consumer ~614K/s ~490K -20%
2 consumers ~646K ~680K +5%
4 consumers ~590K ~640K +8%
8 consumers ~412K ~546K +33%
Scales much better past 2 consumers — message-based pause/resume
added per-tick overhead that grew with consumer count. The 1-consumer
regression is the fixed per-item atomic cost (inflight.sub on every
pull, transitions checked). Channel's value is fan-out; a 1-consumer
use case should typically be a regular stream.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add noopStreamGen and noopConsume moroutines + matching benches that
pit 1-direction stream (worker→parent) against 1-direction channel
(parent→worker) on equal footing — both are one cross-thread hop per
item.
Findings (on this branch, HEAD):
stream (worker generates → parent consumes) ~746K items/s
channel (parent generates → worker consumes) ~652K items/s
stream round-trip (parent → worker → parent) ~464K items/s
vs unoptimized (bb41d5d):
stream 1-dir: ~66K (+11.3x)
channel 1-dir: ~67K (+9.7x)
Two takeaways:
- The earlier "channel beats stream" observation was an artifact of
comparing channel's 1-hop case to stream's 2-hop (pass-through)
noopStream-with-input case. When measured 1-direction-vs-1-direction,
stream and channel are tied pre-optimization.
- Post-optimization, stream is ~15% faster than channel — that's the
win from atomics-based backpressure over message+adaptive-yield.
Applying atomics to the Distributor would likely close that gap.
Also keeps channelFanout at varying worker counts as a separate
section — shows the Distributor's single-threaded producer bottleneck
(peak at ~2 consumers, regresses past 4).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dogfood moroutine's own shared-memory primitives for the stream
backpressure path instead of reinventing the Atomics layer.
- Add `waitAsync(expected, timeoutMs?)` and `notify(count?)` methods
to Int32Atomic, wrapping `Atomics.waitAsync` / `Atomics.notify`.
Returns 'ok' | 'not-equal' | 'timed-out' from the wait. 5 new tests.
- Replace raw Int32Array + Atomics.* calls in pipe.ts with Int32Atomic
method calls. A PipeFlags struct holds two Int32Atomic views
(inflight at offset 0, state at offset 4) over a single 8-byte SAB.
- Expose newPipeFlags() / pipeFlagsFromBuffer() so callers only work
with typed wrappers; the raw SharedArrayBuffer is passed over the
dispatch message edge but both sides reconstruct PipeFlags from it.
Behavior unchanged — same throughput (~450K/s) and backpressure
tightness (~89ms steady latency with 5ms-per-item consumer).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the pause/resume message protocol on the worker-to-parent
stream path with a shared Int32Array of two slots: INFLIGHT and STATE.
Worker increments INFLIGHT after each emit and, on hitting HIGH_WATER,
parks in Atomics.waitAsync(flags, INFLIGHT, current). Parent decrements
INFLIGHT + Atomics.notify on each pull. Cancellation is STATE=CANCEL +
notify; worker checks STATE after each wake.
Eliminates:
- 'pause'/'resume' postMessage round-trip
- Adaptive setImmediate yield per-emit (no longer needed; worker parks
directly on the atomic when backpressured, runs at microtask speed
otherwise)
Only wired through dispatchStream + handleStreamTask. pipeToPort and
channel Distributor still use the message-based adaptive-yield path via
the opts.flags-less branch in pipeIterable.
Results (M-series mac, Node v24.15.0, noopStream, 100K items):
before after
stream throughput ~410K/s ~455K/s +11%
backpressure buffer (avg) 184 ms 89 ms ~2x tighter
backpressure buffer (max) 991 ms 96 ms ~10x tighter
backpressure steady-state still stable
growing at ~89ms
Max queued items drops from ~200 to ~18 (HIGH_WATER + 1-2 overshoot),
within a factor of 2 of the theoretical minimum.
Follow-up work: apply the same pattern to pipeToPort (parent->worker
iterable args) and channel Distributor (1:N fan-out).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One helper in src/pipe.ts now owns the pause/resume/close wiring,
value serialization + transferable collection, and the adaptive
yield state machine. Used by:
- worker-entry.ts handleStreamTask — worker-to-parent stream emission
- execute.ts pipeToPort — parent-to-worker iterable arg piping (now
gains adaptive yield it was previously missing, fixing a latent
starvation bug if the main thread was otherwise idle)
- channel.ts Distributor.tryPull kept separate (multi-consumer
work-stealing doesn't fit the 1:1 shape) but gets the same
adaptive-yield inline for consistency
Behavioral change: pipeToPort now yields adaptively. It previously
never yielded, which was fine when the main thread had other I/O but
could starve pause delivery in pure-CPU pipelines.
Surprising side effect: extracting the worker-entry emit loop into
its own function pushed streaming throughput from ~85K items/s to
~390K items/s for a purely-synchronous generator — same logic,
much better JIT optimization once the emit loop isn't inlined in
the large handleStreamTask async function. A 4.6x win on top of
the 2.1x the adaptive-yield change already gave.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two cleanups on worker-entry.ts:
1. Arg resolution now lives as a small loop at each call site (value
task, stream task, and recursive task-arg) instead of behind a
resolveArgs helper that returned unknown[] | Promise<unknown[]>.
A `needsAsyncResolve(arg)` helper DRYs up the
`MessagePort | task-arg` check. The hot-path (value task) keeps the
sync-prologue + async-tail structure inline so fully-sync args
don't enter async mode at all.
2. Streaming emit loop uses adaptive setImmediate instead of yielding
after every emit. Arms a setImmediate whose callback flips a
`ticked` flag; if the generator's own awaits naturally tick the
loop between emits the flag flips and we skip the forced yield,
otherwise a streak of YIELD_EVERY (=16) emits without a tick
triggers a forced yield so pause/close can reach us.
Net effect: pure-CPU generators jump from ~40K items/s to ~85K (the
fixed-yield-every-16 ceiling); I/O-backed generators should approach
the no-yield ceiling (~489K) because their awaits tick naturally.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous optimization cached the fn + added a sync fast-path for
resolveArgs, but still awaited them unconditionally — which costs a
microtask hop even when the callee returns synchronously.
This change threads "T | Promise<T>" through the hot path and uses
`instanceof Promise` checks to only await when there's actually
something to wait for:
- resolveFn returns a Fn directly when cached
- resolveArgs builds the result array imperatively, switching to an
async helper only at the first arg that needs async resolution
- invokeAndRespond checks whether the moroutine's return value is a
Promise before awaiting it
- the 'message' listener is no longer async; value-task handling runs
fully synchronously when nothing in the chain returns a Promise
Streaming tasks use their own path (handleStreamTask) which remains
async — they're inherently long-lived and the async overhead doesn't
dominate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two small savings on the worker's per-task hot path:
1. Cache the resolved moroutine function by id. Previously every
task did id.slice() + imported.has() + registry.get() every call;
now it's a single fnCache.get() after the first call per
moroutine. Applies to both handleTask and task-arg resolution.
2. When a task's args contain no MessagePort and no task-args,
skip Promise.all and synchronously map through deserializeArg.
Avoids allocating N promises + awaiting Promise.all for tasks
with primitive args (the common case).
Eliminates ~3 map/string ops and N promise allocations per call
after warmup. Small but measurable gain in dispatch throughput.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Measures pure round-trip dispatch overhead with a noop moroutine —
the task does essentially no work, so timings reflect the cost of
postMessage + event loop re-entry per task.
Shapes measured:
- Ping-pong latency (strict await-each, 1 worker)
- Pipelined throughput (1 worker, variable in-flight window)
- Parallel throughput (N workers, Promise.all)
- Batch arg vs per-task vs streaming (1 worker, 100K items)
The batch-vs-stream comparison is particularly informative: batching
100K items into a single task arg is ~34x faster than per-task
dispatch, while the current streaming implementation is ~8x slower
than per-task because of the per-item setImmediate yield used to
keep pause/resume backpressure responsive.
Used together with docs/atomics-bench/ (local-only research notes)
to evaluate Piscina's atomics technique — conclusion on the
atomics-dispatch branch; this bench stays useful regardless.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Published 1.0.0 threw "Cannot find module .../dist/worker-entry.ts"
because the worker entry URL was hardcoded as a string literal. tsc's
extension rewriting only handles import specifiers, so .ts leaked into
dist/*.js. Now derived from import.meta.url — src picks .ts, dist picks .js.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add a paragraph under Load Balancing showing how isTask() narrows a
task to a specific moroutine's descriptor shape, with a keyAffinity
balancer example. Link to examples/worker-affinity for the full demo.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A custom Balancer routes tasks to workers by hashing a shard key
extracted from task args via isTask() narrowing. Demonstrates how
per-worker state (a Map in this demo) stays consistent when calls
for the same key are pinned to the same worker.
Output contrasts round-robin (counts split across workers, wrong
totals) with keyAffinity (counts consistent, correct totals).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Moroutines returned by mo() expose a readonly .id
- isTask(mo, task) narrows task to Task<T, A> inferred from the
moroutine's return type; useful when a pool handles tasks from
multiple moroutines and you want to recover the specific shape
- Task<T, A>.args is now typed as A (previously unknown[]), so
narrowing propagates through task.args — backward compatible
for Task<T> where A defaults to unknown[]
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- inert(task) returns a plain task descriptor without PromiseLike or
AsyncIterable protocols, safe to yield from an (async) generator
without triggering auto-await
- map(run, items, { concurrency, signal }) dispatches an iterable or
async iterable of tasks to a Runner with bounded concurrency, yielding
results in completion order
- Accepts mixed task types: Task<string> | Task<number> yields string | number
- Supports AbortSignal for stream cancellation; moroutine auto-transfers
signals passed as task args so in-flight work can observe the same abort
Includes test coverage, a bounded-map example that hashes a directory
tree via recursive async generator, and a README section.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task<T, A> now carries result and arg types via brand symbols, without
PromiseLike or AsyncIterable protocols. Live tasks returned by mo()
intersect with the appropriate protocol for awaiting/iteration.
Enables accurate result-type inference for helpers like map() that
consume inert Task values — the bare descriptor can be yielded from
a generator or held in arrays without triggering auto-await.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wrap errors in an async catch block inside trackValue (pool) and
runOnDedicated (dedicated), re-throwing with the worker error as
cause. V8's async stack traces then extend the new error with the
caller's await chain, so users see where run(), exec(), or await
task was called.
Adds one microtask hop per dispatch. The error subclass identity
and cause chain are preserved.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Errors thrown in moroutines now transfer via structured clone
instead of extracting message strings. This preserves stack
traces pointing to the actual source, built-in subclass identity
(TypeError, RangeError, etc.), and cause chains. Applies to
regular dispatch, pool dispatch, and streaming tasks.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Task<T> is now a single conditional type: PromiseLike<T> for value
tasks, AsyncIterable<T> for streaming tasks, or the base dispatch
shape when unparameterized. Classes renamed to PromiseLikeTask<T>
and AsyncIterableTask<T>.
BREAKING CHANGE: Task and StreamTask classes renamed; public type
signatures now use Task<T> throughout.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Pool workers are now ref'd for the lifetime of the pool, preventing the
Node.js event loop from exiting prematurely when using top-level await.
Dedicated workers ref/unref with active counting around each dispatch.
Moves ref/unref responsibility out of the shared dispatch layer
(execute/setupWorker) and into the callers — pool workers stay ref'd,
dedicated workers track an active count and only unref when idle.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Move mo() calls from test files to fixtures (stream-context, stream-pipeline)
- Fix unhandled rejection from track().finally() in worker-pool
- Fix error test to use await using for clean disposal
- Drop --experimental-strip-types (Node 24 native) and --test-force-exit
- Quote test glob so Node expands it (shell ** misses top-level files)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Shell glob ** doesn't match files in the immediate directory in sh,
only subdirectories. Quoting lets Node's test runner handle the glob
which correctly matches test/*.test.ts and test/**/*.test.ts.
Node 24 strips types natively so the flag is unnecessary.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
serializeStructField had a SharedStruct branch but no Tuple branch, so a
Tuple used as a SharedStruct field (or as a Tuple element) fell through
to the primitive-shape branch and lost its `elements`. Worker-side
deserialization then crashed on `data.elements.map(...)` with
`Cannot read properties of undefined (reading 'map')`. Mirror the
top-level serializeArg Tuple case in the recursive helper so nested
tuples carry their elements across.
Adds cross-worker regression tests for struct-with-tuple-of-primitives,
struct-with-tuple-of-structs, and tuple-of-tuples.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously findReadyConsumer scanned from index 0 and picked the first
below-cap consumer, so consumer 0 won nearly every tie and the first
worker received a disproportionate share of items. Now the scan starts
from a rotating cursor, giving every below-cap consumer a turn.
Skip-based RR semantics are preserved — consumers at cap are still
skipped so a saturated worker doesn't stall the pipeline. Adds a
dedicated benchmark (examples/benchmark-dispatch/fanout.ts) that
measures distribution spread at different volumes and under
backpressure; tightens the fan-out tests to use the
run.workers.map(assign) pattern so results don't depend on the pool's
default balancer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Remove unused RUN export from pipe.ts
- Remove unused CHANNEL import from execute.ts
- Generalize ChannelOptions.highWaterMark tsdoc (streams + channels)
- README: add Int32Atomic waitAsync/notify section; note atomics backpressure
- Add backpressure tests verifying producer actually stalls when consumer
is slow — peak (emitted - consumed) gap stays within highWater + slop
- Add .changeset/stream-atomics.md (minor bump)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove pipeIterableLegacy, the pipeToPort wrapper, and portToAsyncIterable's
message-based branch. AsyncGenerator and AsyncIterableTask args now produce
a serialized StreamHandle the same way Channel does, and the worker
receives one via portToAsyncIterable's atomics path.
All stream-arg producers now use pipeIterable with a StreamHandle; the
worker always receives a StreamHandle. One code path, one data shape,
no more "legacy vs atomics" branches.
~150 lines removed. Tests 332/332. No bench change observed — the
round-trip stream bottleneck moved to the worker-side passthrough
generator, not the transport. Correctness/consistency win, not a perf
win.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Unify how "one streaming endpoint" is represented and transported:
- PipeFlags is now a SharedStruct<{inflight, state}> allocated via
moroutine's own shared({...}) primitive. No more hand-rolled byte
offsets or manual Int32Atomic construction over a raw SAB.
- StreamHandle is a first-class object carrying {port, flags,
readySignal?, highWater} — the complete description of one end of
a backpressured stream.
- serializeStreamHandle / deserializeStreamHandle / isSerializedStreamHandle
round-trip a handle through postMessage using the existing
serializeArg/deserializeArg machinery. No custom "rebuild from
buffer" helper — the shared-primitive layer already does this.
Callers no longer juggle free-floating {port, flags: SAB, readySignal:
SAB, highWater: number}; they pass a StreamHandle. dispatchStream's
message now has a single `stream` field; Channel.addConsumer returns
a StreamHandle directly; prepareArg serializes it for transport.
Behavior and throughput unchanged. 332/332 tests pass; channel
fan-out and stream numbers within noise of pre-refactor.
Renamed: moved the old "legacy" pause/resume + adaptive-yield pipe
to pipeIterableLegacy (still used by pipeToPort for AsyncGenerator
args that haven't migrated to atomics).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
channel(source, { highWaterMark }) now plumbs the threshold through:
- Channel stores it from opts (default 16)
- findReadyConsumer uses it for the cap check
- addConsumer returns it in the per-consumer handle
- prepareArg forwards it in the {__stream__, ...} arg
- Worker portToAsyncIterable receives it; uses it in the cap→below-cap
transition check that signals readySignal
- Legacy message-based path uses it for the pause-threshold too
Two sides both need it (producer scans against it; consumer detects
the transition to signal readiness), so it travels with the arg.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dispatchStream was reading opts.highWaterMark into a local but never
forwarding it to the worker. Worker-side pipeIterable was defaulting
to its own hardcoded 16, so user-supplied highWaterMark had no effect
after the atomics refactor.
Include highWater in the dispatch message; handleStreamTask passes it
to pipeIterable's flags-branch so the producer parks at the requested
threshold.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Hybrid design: keep the single tryPull loop on the parent (no
contention on source.next()) but replace message-based pause/resume
with atomics.
- Each consumer has its own PipeFlags (inflight + state); consumer-
side portToAsyncIterable decrements inflight on pull.
- A single Int32Atomic readySignal is shared across all consumers of
the channel. Consumers bump readySignal on the cap→below-cap
transition (only when it matters, to avoid spurious distributor
wakeups). The distributor parks on readySignal when every consumer
is at cap.
- Stream-arg shape extended: {__stream__, port, flags, readySignal?}.
Channel sets readySignal; pipeToPort / worker-generator streams
don't (they use inflight directly).
Tradeoff vs the previous message-based adaptive-yield Distributor:
before after delta
1 consumer ~614K/s ~490K -20%
2 consumers ~646K ~680K +5%
4 consumers ~590K ~640K +8%
8 consumers ~412K ~546K +33%
Scales much better past 2 consumers — message-based pause/resume
added per-tick overhead that grew with consumer count. The 1-consumer
regression is the fixed per-item atomic cost (inflight.sub on every
pull, transitions checked). Channel's value is fan-out; a 1-consumer
use case should typically be a regular stream.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add noopStreamGen and noopConsume moroutines + matching benches that
pit 1-direction stream (worker→parent) against 1-direction channel
(parent→worker) on equal footing — both are one cross-thread hop per
item.
Findings (on this branch, HEAD):
stream (worker generates → parent consumes) ~746K items/s
channel (parent generates → worker consumes) ~652K items/s
stream round-trip (parent → worker → parent) ~464K items/s
vs unoptimized (bb41d5d):
stream 1-dir: ~66K (+11.3x)
channel 1-dir: ~67K (+9.7x)
Two takeaways:
- The earlier "channel beats stream" observation was an artifact of
comparing channel's 1-hop case to stream's 2-hop (pass-through)
noopStream-with-input case. When measured 1-direction-vs-1-direction,
stream and channel are tied pre-optimization.
- Post-optimization, stream is ~15% faster than channel — that's the
win from atomics-based backpressure over message+adaptive-yield.
Applying atomics to the Distributor would likely close that gap.
Also keeps channelFanout at varying worker counts as a separate
section — shows the Distributor's single-threaded producer bottleneck
(peak at ~2 consumers, regresses past 4).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dogfood moroutine's own shared-memory primitives for the stream
backpressure path instead of reinventing the Atomics layer.
- Add `waitAsync(expected, timeoutMs?)` and `notify(count?)` methods
to Int32Atomic, wrapping `Atomics.waitAsync` / `Atomics.notify`.
Returns 'ok' | 'not-equal' | 'timed-out' from the wait. 5 new tests.
- Replace raw Int32Array + Atomics.* calls in pipe.ts with Int32Atomic
method calls. A PipeFlags struct holds two Int32Atomic views
(inflight at offset 0, state at offset 4) over a single 8-byte SAB.
- Expose newPipeFlags() / pipeFlagsFromBuffer() so callers only work
with typed wrappers; the raw SharedArrayBuffer is passed over the
dispatch message edge but both sides reconstruct PipeFlags from it.
Behavior unchanged — same throughput (~450K/s) and backpressure
tightness (~89ms steady latency with 5ms-per-item consumer).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the pause/resume message protocol on the worker-to-parent
stream path with a shared Int32Array of two slots: INFLIGHT and STATE.
Worker increments INFLIGHT after each emit and, on hitting HIGH_WATER,
parks in Atomics.waitAsync(flags, INFLIGHT, current). Parent decrements
INFLIGHT + Atomics.notify on each pull. Cancellation is STATE=CANCEL +
notify; worker checks STATE after each wake.
Eliminates:
- 'pause'/'resume' postMessage round-trip
- Adaptive setImmediate yield per-emit (no longer needed; worker parks
directly on the atomic when backpressured, runs at microtask speed
otherwise)
Only wired through dispatchStream + handleStreamTask. pipeToPort and
channel Distributor still use the message-based adaptive-yield path via
the opts.flags-less branch in pipeIterable.
Results (M-series mac, Node v24.15.0, noopStream, 100K items):
before after
stream throughput ~410K/s ~455K/s +11%
backpressure buffer (avg) 184 ms 89 ms ~2x tighter
backpressure buffer (max) 991 ms 96 ms ~10x tighter
backpressure steady-state still stable
growing at ~89ms
Max queued items drops from ~200 to ~18 (HIGH_WATER + 1-2 overshoot),
within a factor of 2 of the theoretical minimum.
Follow-up work: apply the same pattern to pipeToPort (parent->worker
iterable args) and channel Distributor (1:N fan-out).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One helper in src/pipe.ts now owns the pause/resume/close wiring,
value serialization + transferable collection, and the adaptive
yield state machine. Used by:
- worker-entry.ts handleStreamTask — worker-to-parent stream emission
- execute.ts pipeToPort — parent-to-worker iterable arg piping (now
gains adaptive yield it was previously missing, fixing a latent
starvation bug if the main thread was otherwise idle)
- channel.ts Distributor.tryPull kept separate (multi-consumer
work-stealing doesn't fit the 1:1 shape) but gets the same
adaptive-yield inline for consistency
Behavioral change: pipeToPort now yields adaptively. It previously
never yielded, which was fine when the main thread had other I/O but
could starve pause delivery in pure-CPU pipelines.
Surprising side effect: extracting the worker-entry emit loop into
its own function pushed streaming throughput from ~85K items/s to
~390K items/s for a purely-synchronous generator — same logic,
much better JIT optimization once the emit loop isn't inlined in
the large handleStreamTask async function. A 4.6x win on top of
the 2.1x the adaptive-yield change already gave.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two cleanups on worker-entry.ts:
1. Arg resolution now lives as a small loop at each call site (value
task, stream task, and recursive task-arg) instead of behind a
resolveArgs helper that returned unknown[] | Promise<unknown[]>.
A `needsAsyncResolve(arg)` helper DRYs up the
`MessagePort | task-arg` check. The hot-path (value task) keeps the
sync-prologue + async-tail structure inline so fully-sync args
don't enter async mode at all.
2. Streaming emit loop uses adaptive setImmediate instead of yielding
after every emit. Arms a setImmediate whose callback flips a
`ticked` flag; if the generator's own awaits naturally tick the
loop between emits the flag flips and we skip the forced yield,
otherwise a streak of YIELD_EVERY (=16) emits without a tick
triggers a forced yield so pause/close can reach us.
Net effect: pure-CPU generators jump from ~40K items/s to ~85K (the
fixed-yield-every-16 ceiling); I/O-backed generators should approach
the no-yield ceiling (~489K) because their awaits tick naturally.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous optimization cached the fn + added a sync fast-path for
resolveArgs, but still awaited them unconditionally — which costs a
microtask hop even when the callee returns synchronously.
This change threads "T | Promise<T>" through the hot path and uses
`instanceof Promise` checks to only await when there's actually
something to wait for:
- resolveFn returns a Fn directly when cached
- resolveArgs builds the result array imperatively, switching to an
async helper only at the first arg that needs async resolution
- invokeAndRespond checks whether the moroutine's return value is a
Promise before awaiting it
- the 'message' listener is no longer async; value-task handling runs
fully synchronously when nothing in the chain returns a Promise
Streaming tasks use their own path (handleStreamTask) which remains
async — they're inherently long-lived and the async overhead doesn't
dominate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two small savings on the worker's per-task hot path:
1. Cache the resolved moroutine function by id. Previously every
task did id.slice() + imported.has() + registry.get() every call;
now it's a single fnCache.get() after the first call per
moroutine. Applies to both handleTask and task-arg resolution.
2. When a task's args contain no MessagePort and no task-args,
skip Promise.all and synchronously map through deserializeArg.
Avoids allocating N promises + awaiting Promise.all for tasks
with primitive args (the common case).
Eliminates ~3 map/string ops and N promise allocations per call
after warmup. Small but measurable gain in dispatch throughput.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Measures pure round-trip dispatch overhead with a noop moroutine —
the task does essentially no work, so timings reflect the cost of
postMessage + event loop re-entry per task.
Shapes measured:
- Ping-pong latency (strict await-each, 1 worker)
- Pipelined throughput (1 worker, variable in-flight window)
- Parallel throughput (N workers, Promise.all)
- Batch arg vs per-task vs streaming (1 worker, 100K items)
The batch-vs-stream comparison is particularly informative: batching
100K items into a single task arg is ~34x faster than per-task
dispatch, while the current streaming implementation is ~8x slower
than per-task because of the per-item setImmediate yield used to
keep pause/resume backpressure responsive.
Used together with docs/atomics-bench/ (local-only research notes)
to evaluate Piscina's atomics technique — conclusion on the
atomics-dispatch branch; this bench stays useful regardless.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Published 1.0.0 threw "Cannot find module .../dist/worker-entry.ts"
because the worker entry URL was hardcoded as a string literal. tsc's
extension rewriting only handles import specifiers, so .ts leaked into
dist/*.js. Now derived from import.meta.url — src picks .ts, dist picks .js.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A custom Balancer routes tasks to workers by hashing a shard key
extracted from task args via isTask() narrowing. Demonstrates how
per-worker state (a Map in this demo) stays consistent when calls
for the same key are pinned to the same worker.
Output contrasts round-robin (counts split across workers, wrong
totals) with keyAffinity (counts consistent, correct totals).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Moroutines returned by mo() expose a readonly .id
- isTask(mo, task) narrows task to Task<T, A> inferred from the
moroutine's return type; useful when a pool handles tasks from
multiple moroutines and you want to recover the specific shape
- Task<T, A>.args is now typed as A (previously unknown[]), so
narrowing propagates through task.args — backward compatible
for Task<T> where A defaults to unknown[]
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- inert(task) returns a plain task descriptor without PromiseLike or
AsyncIterable protocols, safe to yield from an (async) generator
without triggering auto-await
- map(run, items, { concurrency, signal }) dispatches an iterable or
async iterable of tasks to a Runner with bounded concurrency, yielding
results in completion order
- Accepts mixed task types: Task<string> | Task<number> yields string | number
- Supports AbortSignal for stream cancellation; moroutine auto-transfers
signals passed as task args so in-flight work can observe the same abort
Includes test coverage, a bounded-map example that hashes a directory
tree via recursive async generator, and a README section.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task<T, A> now carries result and arg types via brand symbols, without
PromiseLike or AsyncIterable protocols. Live tasks returned by mo()
intersect with the appropriate protocol for awaiting/iteration.
Enables accurate result-type inference for helpers like map() that
consume inert Task values — the bare descriptor can be yielded from
a generator or held in arrays without triggering auto-await.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap errors in an async catch block inside trackValue (pool) and
runOnDedicated (dedicated), re-throwing with the worker error as
cause. V8's async stack traces then extend the new error with the
caller's await chain, so users see where run(), exec(), or await
task was called.
Adds one microtask hop per dispatch. The error subclass identity
and cause chain are preserved.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Errors thrown in moroutines now transfer via structured clone
instead of extracting message strings. This preserves stack
traces pointing to the actual source, built-in subclass identity
(TypeError, RangeError, etc.), and cause chains. Applies to
regular dispatch, pool dispatch, and streaming tasks.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Task<T> is now a single conditional type: PromiseLike<T> for value
tasks, AsyncIterable<T> for streaming tasks, or the base dispatch
shape when unparameterized. Classes renamed to PromiseLikeTask<T>
and AsyncIterableTask<T>.
BREAKING CHANGE: Task and StreamTask classes renamed; public type
signatures now use Task<T> throughout.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Pool workers are now ref'd for the lifetime of the pool, preventing the
Node.js event loop from exiting prematurely when using top-level await.
Dedicated workers ref/unref with active counting around each dispatch.
Moves ref/unref responsibility out of the shared dispatch layer
(execute/setupWorker) and into the callers — pool workers stay ref'd,
dedicated workers track an active count and only unref when idle.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Move mo() calls from test files to fixtures (stream-context, stream-pipeline)
- Fix unhandled rejection from track().finally() in worker-pool
- Fix error test to use await using for clean disposal
- Drop --experimental-strip-types (Node 24 native) and --test-force-exit
- Quote test glob so Node expands it (shell ** misses top-level files)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Shell glob ** doesn't match files in the immediate directory in sh,
only subdirectories. Quoting lets Node's test runner handle the glob
which correctly matches test/*.test.ts and test/**/*.test.ts.
Node 24 strips types natively so the flag is unnecessary.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>