commits
/dev/fd is not available on all Linux distros (notably NixOS).
Use /proc/self/fd on Linux, /dev/fd on macOS.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The previous 50ms setTimeout was unreliable — the worker may not have
booted and started consuming fds yet. Now:
- in-flight test: waits for HTTP response headers (worker is mid-flight)
- drainTimeout test: waits for TCP connect + 100ms grace for fd transit
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- spike.test.ts: use /dev/fd dup approach (matching production) instead
of _handle=null which left an orphaned libuv handle keeping the
process alive
- channel-fanout.test.ts: relax starvation assertion from >= 50 to > 0;
under full-suite CPU contention the skip-based RR legitimately
concentrates items on faster workers
- pool-ref.test.ts: increase child process timeout from 5s to 15s to
accommodate worker boot under load
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without the rotating offset, sequential requests that arrive after the
previous one completed (counter back to 0) always land on worker 0.
Now ties are broken by a rotating offset — same approach as the core
roundRobin Balancer.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The balancer now receives ServerThread[] ({ worker, conns }) and returns
the selected thread — disambiguates from moroutine's core Balancer
interface and gives custom balancers access to WorkerHandle properties.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of passing the entire counters tuple + slot index, pass the
individual Int32Atomic for this worker's slot. Simpler signature,
fewer args, less indirection in listen().
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- closeSync(fd) in the catch path prevents fd leak when a PushChannel
is already closed during shutdown.
- Pass highWaterMark through to channel() so backpressure is enforced
at the cross-thread pipe layer.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move PushChannel/pushChannel out of the public moroutine API and into
src/serve/ as an internal helper. listen() and ListenArgs now accept the
general AsyncIterable<number> interface so moroutine can transparently
transfer the iterable cross-thread without coupling to a specific type.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Also adds PushChannel<T>/pushChannel() as a push-based async-iterable
channel (with .send()/.close()), required by listen() and serverThreads.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
serializeStructField had a SharedStruct branch but no Tuple branch, so a
Tuple used as a SharedStruct field (or as a Tuple element) fell through
to the primitive-shape branch and lost its `elements`. Worker-side
deserialization then crashed on `data.elements.map(...)` with
`Cannot read properties of undefined (reading 'map')`. Mirror the
top-level serializeArg Tuple case in the recursive helper so nested
tuples carry their elements across.
Adds cross-worker regression tests for struct-with-tuple-of-primitives,
struct-with-tuple-of-structs, and tuple-of-tuples.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously findReadyConsumer scanned from index 0 and picked the first
below-cap consumer, so consumer 0 won nearly every tie and the first
worker received a disproportionate share of items. Now the scan starts
from a rotating cursor, giving every below-cap consumer a turn.
Skip-based RR semantics are preserved — consumers at cap are still
skipped so a saturated worker doesn't stall the pipeline. Adds a
dedicated benchmark (examples/benchmark-dispatch/fanout.ts) that
measures distribution spread at different volumes and under
backpressure; tightens the fan-out tests to use the
run.workers.map(assign) pattern so results don't depend on the pool's
default balancer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Remove unused RUN export from pipe.ts
- Remove unused CHANNEL import from execute.ts
- Generalize ChannelOptions.highWaterMark tsdoc (streams + channels)
- README: add Int32Atomic waitAsync/notify section; note atomics backpressure
- Add backpressure tests verifying producer actually stalls when consumer
is slow — peak (emitted - consumed) gap stays within highWater + slop
- Add .changeset/stream-atomics.md (minor bump)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove pipeIterableLegacy, the pipeToPort wrapper, and portToAsyncIterable's
message-based branch. AsyncGenerator and AsyncIterableTask args now produce
a serialized StreamHandle the same way Channel does, and the worker
receives one via portToAsyncIterable's atomics path.
All stream-arg producers now use pipeIterable with a StreamHandle; the
worker always receives a StreamHandle. One code path, one data shape,
no more "legacy vs atomics" branches.
~150 lines removed. Tests 332/332. No bench change observed — the
round-trip stream bottleneck moved to the worker-side passthrough
generator, not the transport. Correctness/consistency win, not a perf
win.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Unify how "one streaming endpoint" is represented and transported:
- PipeFlags is now a SharedStruct<{inflight, state}> allocated via
moroutine's own shared({...}) primitive. No more hand-rolled byte
offsets or manual Int32Atomic construction over a raw SAB.
- StreamHandle is a first-class object carrying {port, flags,
readySignal?, highWater} — the complete description of one end of
a backpressured stream.
- serializeStreamHandle / deserializeStreamHandle / isSerializedStreamHandle
round-trip a handle through postMessage using the existing
serializeArg/deserializeArg machinery. No custom "rebuild from
buffer" helper — the shared-primitive layer already does this.
Callers no longer juggle free-floating {port, flags: SAB, readySignal:
SAB, highWater: number}; they pass a StreamHandle. dispatchStream's
message now has a single `stream` field; Channel.addConsumer returns
a StreamHandle directly; prepareArg serializes it for transport.
Behavior and throughput unchanged. 332/332 tests pass; channel
fan-out and stream numbers within noise of pre-refactor.
Renamed: moved the old "legacy" pause/resume + adaptive-yield pipe
to pipeIterableLegacy (still used by pipeToPort for AsyncGenerator
args that haven't migrated to atomics).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
channel(source, { highWaterMark }) now plumbs the threshold through:
- Channel stores it from opts (default 16)
- findReadyConsumer uses it for the cap check
- addConsumer returns it in the per-consumer handle
- prepareArg forwards it in the {__stream__, ...} arg
- Worker portToAsyncIterable receives it; uses it in the cap→below-cap
transition check that signals readySignal
- Legacy message-based path uses it for the pause-threshold too
Two sides both need it (producer scans against it; consumer detects
the transition to signal readiness), so it travels with the arg.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dispatchStream was reading opts.highWaterMark into a local but never
forwarding it to the worker. Worker-side pipeIterable was defaulting
to its own hardcoded 16, so user-supplied highWaterMark had no effect
after the atomics refactor.
Include highWater in the dispatch message; handleStreamTask passes it
to pipeIterable's flags-branch so the producer parks at the requested
threshold.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Hybrid design: keep the single tryPull loop on the parent (no
contention on source.next()) but replace message-based pause/resume
with atomics.
- Each consumer has its own PipeFlags (inflight + state); consumer-
side portToAsyncIterable decrements inflight on pull.
- A single Int32Atomic readySignal is shared across all consumers of
the channel. Consumers bump readySignal on the cap→below-cap
transition (only when it matters, to avoid spurious distributor
wakeups). The distributor parks on readySignal when every consumer
is at cap.
- Stream-arg shape extended: {__stream__, port, flags, readySignal?}.
Channel sets readySignal; pipeToPort / worker-generator streams
don't (they use inflight directly).
Tradeoff vs the previous message-based adaptive-yield Distributor:
before after delta
1 consumer ~614K/s ~490K -20%
2 consumers ~646K ~680K +5%
4 consumers ~590K ~640K +8%
8 consumers ~412K ~546K +33%
Scales much better past 2 consumers — message-based pause/resume
added per-tick overhead that grew with consumer count. The 1-consumer
regression is the fixed per-item atomic cost (inflight.sub on every
pull, transitions checked). Channel's value is fan-out; a 1-consumer
use case should typically be a regular stream.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add noopStreamGen and noopConsume moroutines + matching benches that
pit 1-direction stream (worker→parent) against 1-direction channel
(parent→worker) on equal footing — both are one cross-thread hop per
item.
Findings (on this branch, HEAD):
stream (worker generates → parent consumes) ~746K items/s
channel (parent generates → worker consumes) ~652K items/s
stream round-trip (parent → worker → parent) ~464K items/s
vs unoptimized (bb41d5d):
stream 1-dir: ~66K (+11.3x)
channel 1-dir: ~67K (+9.7x)
Two takeaways:
- The earlier "channel beats stream" observation was an artifact of
comparing channel's 1-hop case to stream's 2-hop (pass-through)
noopStream-with-input case. When measured 1-direction-vs-1-direction,
stream and channel are tied pre-optimization.
- Post-optimization, stream is ~15% faster than channel — that's the
win from atomics-based backpressure over message+adaptive-yield.
Applying atomics to the Distributor would likely close that gap.
Also keeps channelFanout at varying worker counts as a separate
section — shows the Distributor's single-threaded producer bottleneck
(peak at ~2 consumers, regresses past 4).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dogfood moroutine's own shared-memory primitives for the stream
backpressure path instead of reinventing the Atomics layer.
- Add `waitAsync(expected, timeoutMs?)` and `notify(count?)` methods
to Int32Atomic, wrapping `Atomics.waitAsync` / `Atomics.notify`.
Returns 'ok' | 'not-equal' | 'timed-out' from the wait. 5 new tests.
- Replace raw Int32Array + Atomics.* calls in pipe.ts with Int32Atomic
method calls. A PipeFlags struct holds two Int32Atomic views
(inflight at offset 0, state at offset 4) over a single 8-byte SAB.
- Expose newPipeFlags() / pipeFlagsFromBuffer() so callers only work
with typed wrappers; the raw SharedArrayBuffer is passed over the
dispatch message edge but both sides reconstruct PipeFlags from it.
Behavior unchanged — same throughput (~450K/s) and backpressure
tightness (~89ms steady latency with 5ms-per-item consumer).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the pause/resume message protocol on the worker-to-parent
stream path with a shared Int32Array of two slots: INFLIGHT and STATE.
Worker increments INFLIGHT after each emit and, on hitting HIGH_WATER,
parks in Atomics.waitAsync(flags, INFLIGHT, current). Parent decrements
INFLIGHT + Atomics.notify on each pull. Cancellation is STATE=CANCEL +
notify; worker checks STATE after each wake.
Eliminates:
- 'pause'/'resume' postMessage round-trip
- Adaptive setImmediate yield per-emit (no longer needed; worker parks
directly on the atomic when backpressured, runs at microtask speed
otherwise)
Only wired through dispatchStream + handleStreamTask. pipeToPort and
channel Distributor still use the message-based adaptive-yield path via
the opts.flags-less branch in pipeIterable.
Results (M-series mac, Node v24.15.0, noopStream, 100K items):
before after
stream throughput ~410K/s ~455K/s +11%
backpressure buffer (avg) 184 ms 89 ms ~2x tighter
backpressure buffer (max) 991 ms 96 ms ~10x tighter
backpressure steady-state still stable
growing at ~89ms
Max queued items drops from ~200 to ~18 (HIGH_WATER + 1-2 overshoot),
within a factor of 2 of the theoretical minimum.
Follow-up work: apply the same pattern to pipeToPort (parent->worker
iterable args) and channel Distributor (1:N fan-out).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One helper in src/pipe.ts now owns the pause/resume/close wiring,
value serialization + transferable collection, and the adaptive
yield state machine. Used by:
- worker-entry.ts handleStreamTask — worker-to-parent stream emission
- execute.ts pipeToPort — parent-to-worker iterable arg piping (now
gains adaptive yield it was previously missing, fixing a latent
starvation bug if the main thread was otherwise idle)
- channel.ts Distributor.tryPull kept separate (multi-consumer
work-stealing doesn't fit the 1:1 shape) but gets the same
adaptive-yield inline for consistency
Behavioral change: pipeToPort now yields adaptively. It previously
never yielded, which was fine when the main thread had other I/O but
could starve pause delivery in pure-CPU pipelines.
Surprising side effect: extracting the worker-entry emit loop into
its own function pushed streaming throughput from ~85K items/s to
~390K items/s for a purely-synchronous generator — same logic,
much better JIT optimization once the emit loop isn't inlined in
the large handleStreamTask async function. A 4.6x win on top of
the 2.1x the adaptive-yield change already gave.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two cleanups on worker-entry.ts:
1. Arg resolution now lives as a small loop at each call site (value
task, stream task, and recursive task-arg) instead of behind a
resolveArgs helper that returned unknown[] | Promise<unknown[]>.
A `needsAsyncResolve(arg)` helper DRYs up the
`MessagePort | task-arg` check. The hot-path (value task) keeps the
sync-prologue + async-tail structure inline so fully-sync args
don't enter async mode at all.
2. Streaming emit loop uses adaptive setImmediate instead of yielding
after every emit. Arms a setImmediate whose callback flips a
`ticked` flag; if the generator's own awaits naturally tick the
loop between emits the flag flips and we skip the forced yield,
otherwise a streak of YIELD_EVERY (=16) emits without a tick
triggers a forced yield so pause/close can reach us.
Net effect: pure-CPU generators jump from ~40K items/s to ~85K (the
fixed-yield-every-16 ceiling); I/O-backed generators should approach
the no-yield ceiling (~489K) because their awaits tick naturally.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous optimization cached the fn + added a sync fast-path for
resolveArgs, but still awaited them unconditionally — which costs a
microtask hop even when the callee returns synchronously.
This change threads "T | Promise<T>" through the hot path and uses
`instanceof Promise` checks to only await when there's actually
something to wait for:
- resolveFn returns a Fn directly when cached
- resolveArgs builds the result array imperatively, switching to an
async helper only at the first arg that needs async resolution
- invokeAndRespond checks whether the moroutine's return value is a
Promise before awaiting it
- the 'message' listener is no longer async; value-task handling runs
fully synchronously when nothing in the chain returns a Promise
Streaming tasks use their own path (handleStreamTask) which remains
async — they're inherently long-lived and the async overhead doesn't
dominate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two small savings on the worker's per-task hot path:
1. Cache the resolved moroutine function by id. Previously every
task did id.slice() + imported.has() + registry.get() every call;
now it's a single fnCache.get() after the first call per
moroutine. Applies to both handleTask and task-arg resolution.
2. When a task's args contain no MessagePort and no task-args,
skip Promise.all and synchronously map through deserializeArg.
Avoids allocating N promises + awaiting Promise.all for tasks
with primitive args (the common case).
Eliminates ~3 map/string ops and N promise allocations per call
after warmup. Small but measurable gain in dispatch throughput.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Measures pure round-trip dispatch overhead with a noop moroutine —
the task does essentially no work, so timings reflect the cost of
postMessage + event loop re-entry per task.
Shapes measured:
- Ping-pong latency (strict await-each, 1 worker)
- Pipelined throughput (1 worker, variable in-flight window)
- Parallel throughput (N workers, Promise.all)
- Batch arg vs per-task vs streaming (1 worker, 100K items)
The batch-vs-stream comparison is particularly informative: batching
100K items into a single task arg is ~34x faster than per-task
dispatch, while the current streaming implementation is ~8x slower
than per-task because of the per-item setImmediate yield used to
keep pause/resume backpressure responsive.
Used together with docs/atomics-bench/ (local-only research notes)
to evaluate Piscina's atomics technique — conclusion on the
atomics-dispatch branch; this bench stays useful regardless.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Published 1.0.0 threw "Cannot find module .../dist/worker-entry.ts"
because the worker entry URL was hardcoded as a string literal. tsc's
extension rewriting only handles import specifiers, so .ts leaked into
dist/*.js. Now derived from import.meta.url — src picks .ts, dist picks .js.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add a paragraph under Load Balancing showing how isTask() narrows a
task to a specific moroutine's descriptor shape, with a keyAffinity
balancer example. Link to examples/worker-affinity for the full demo.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A custom Balancer routes tasks to workers by hashing a shard key
extracted from task args via isTask() narrowing. Demonstrates how
per-worker state (a Map in this demo) stays consistent when calls
for the same key are pinned to the same worker.
Output contrasts round-robin (counts split across workers, wrong
totals) with keyAffinity (counts consistent, correct totals).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Moroutines returned by mo() expose a readonly .id
- isTask(mo, task) narrows task to Task<T, A> inferred from the
moroutine's return type; useful when a pool handles tasks from
multiple moroutines and you want to recover the specific shape
- Task<T, A>.args is now typed as A (previously unknown[]), so
narrowing propagates through task.args — backward compatible
for Task<T> where A defaults to unknown[]
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- inert(task) returns a plain task descriptor without PromiseLike or
AsyncIterable protocols, safe to yield from an (async) generator
without triggering auto-await
- map(run, items, { concurrency, signal }) dispatches an iterable or
async iterable of tasks to a Runner with bounded concurrency, yielding
results in completion order
- Accepts mixed task types: Task<string> | Task<number> yields string | number
- Supports AbortSignal for stream cancellation; moroutine auto-transfers
signals passed as task args so in-flight work can observe the same abort
Includes test coverage, a bounded-map example that hashes a directory
tree via recursive async generator, and a README section.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task<T, A> now carries result and arg types via brand symbols, without
PromiseLike or AsyncIterable protocols. Live tasks returned by mo()
intersect with the appropriate protocol for awaiting/iteration.
Enables accurate result-type inference for helpers like map() that
consume inert Task values — the bare descriptor can be yielded from
a generator or held in arrays without triggering auto-await.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The previous 50ms setTimeout was unreliable — the worker may not have
booted and started consuming fds yet. Now:
- in-flight test: waits for HTTP response headers (worker is mid-flight)
- drainTimeout test: waits for TCP connect + 100ms grace for fd transit
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- spike.test.ts: use /dev/fd dup approach (matching production) instead
of _handle=null which left an orphaned libuv handle keeping the
process alive
- channel-fanout.test.ts: relax starvation assertion from >= 50 to > 0;
under full-suite CPU contention the skip-based RR legitimately
concentrates items on faster workers
- pool-ref.test.ts: increase child process timeout from 5s to 15s to
accommodate worker boot under load
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move PushChannel/pushChannel out of the public moroutine API and into
src/serve/ as an internal helper. listen() and ListenArgs now accept the
general AsyncIterable<number> interface so moroutine can transparently
transfer the iterable cross-thread without coupling to a specific type.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
serializeStructField had a SharedStruct branch but no Tuple branch, so a
Tuple used as a SharedStruct field (or as a Tuple element) fell through
to the primitive-shape branch and lost its `elements`. Worker-side
deserialization then crashed on `data.elements.map(...)` with
`Cannot read properties of undefined (reading 'map')`. Mirror the
top-level serializeArg Tuple case in the recursive helper so nested
tuples carry their elements across.
Adds cross-worker regression tests for struct-with-tuple-of-primitives,
struct-with-tuple-of-structs, and tuple-of-tuples.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously findReadyConsumer scanned from index 0 and picked the first
below-cap consumer, so consumer 0 won nearly every tie and the first
worker received a disproportionate share of items. Now the scan starts
from a rotating cursor, giving every below-cap consumer a turn.
Skip-based RR semantics are preserved — consumers at cap are still
skipped so a saturated worker doesn't stall the pipeline. Adds a
dedicated benchmark (examples/benchmark-dispatch/fanout.ts) that
measures distribution spread at different volumes and under
backpressure; tightens the fan-out tests to use the
run.workers.map(assign) pattern so results don't depend on the pool's
default balancer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Remove unused RUN export from pipe.ts
- Remove unused CHANNEL import from execute.ts
- Generalize ChannelOptions.highWaterMark tsdoc (streams + channels)
- README: add Int32Atomic waitAsync/notify section; note atomics backpressure
- Add backpressure tests verifying producer actually stalls when consumer
is slow — peak (emitted - consumed) gap stays within highWater + slop
- Add .changeset/stream-atomics.md (minor bump)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove pipeIterableLegacy, the pipeToPort wrapper, and portToAsyncIterable's
message-based branch. AsyncGenerator and AsyncIterableTask args now produce
a serialized StreamHandle the same way Channel does, and the worker
receives one via portToAsyncIterable's atomics path.
All stream-arg producers now use pipeIterable with a StreamHandle; the
worker always receives a StreamHandle. One code path, one data shape,
no more "legacy vs atomics" branches.
~150 lines removed. Tests 332/332. No bench change observed — the
round-trip stream bottleneck moved to the worker-side passthrough
generator, not the transport. Correctness/consistency win, not a perf
win.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Unify how "one streaming endpoint" is represented and transported:
- PipeFlags is now a SharedStruct<{inflight, state}> allocated via
moroutine's own shared({...}) primitive. No more hand-rolled byte
offsets or manual Int32Atomic construction over a raw SAB.
- StreamHandle is a first-class object carrying {port, flags,
readySignal?, highWater} — the complete description of one end of
a backpressured stream.
- serializeStreamHandle / deserializeStreamHandle / isSerializedStreamHandle
round-trip a handle through postMessage using the existing
serializeArg/deserializeArg machinery. No custom "rebuild from
buffer" helper — the shared-primitive layer already does this.
Callers no longer juggle free-floating {port, flags: SAB, readySignal:
SAB, highWater: number}; they pass a StreamHandle. dispatchStream's
message now has a single `stream` field; Channel.addConsumer returns
a StreamHandle directly; prepareArg serializes it for transport.
Behavior and throughput unchanged. 332/332 tests pass; channel
fan-out and stream numbers within noise of pre-refactor.
Renamed: moved the old "legacy" pause/resume + adaptive-yield pipe
to pipeIterableLegacy (still used by pipeToPort for AsyncGenerator
args that haven't migrated to atomics).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
channel(source, { highWaterMark }) now plumbs the threshold through:
- Channel stores it from opts (default 16)
- findReadyConsumer uses it for the cap check
- addConsumer returns it in the per-consumer handle
- prepareArg forwards it in the {__stream__, ...} arg
- Worker portToAsyncIterable receives it; uses it in the cap→below-cap
transition check that signals readySignal
- Legacy message-based path uses it for the pause-threshold too
Two sides both need it (producer scans against it; consumer detects
the transition to signal readiness), so it travels with the arg.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dispatchStream was reading opts.highWaterMark into a local but never
forwarding it to the worker. Worker-side pipeIterable was defaulting
to its own hardcoded 16, so user-supplied highWaterMark had no effect
after the atomics refactor.
Include highWater in the dispatch message; handleStreamTask passes it
to pipeIterable's flags-branch so the producer parks at the requested
threshold.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Hybrid design: keep the single tryPull loop on the parent (no
contention on source.next()) but replace message-based pause/resume
with atomics.
- Each consumer has its own PipeFlags (inflight + state); consumer-
side portToAsyncIterable decrements inflight on pull.
- A single Int32Atomic readySignal is shared across all consumers of
the channel. Consumers bump readySignal on the cap→below-cap
transition (only when it matters, to avoid spurious distributor
wakeups). The distributor parks on readySignal when every consumer
is at cap.
- Stream-arg shape extended: {__stream__, port, flags, readySignal?}.
Channel sets readySignal; pipeToPort / worker-generator streams
don't (they use inflight directly).
Tradeoff vs the previous message-based adaptive-yield Distributor:
before after delta
1 consumer ~614K/s ~490K -20%
2 consumers ~646K ~680K +5%
4 consumers ~590K ~640K +8%
8 consumers ~412K ~546K +33%
Scales much better past 2 consumers — message-based pause/resume
added per-tick overhead that grew with consumer count. The 1-consumer
regression is the fixed per-item atomic cost (inflight.sub on every
pull, transitions checked). Channel's value is fan-out; a 1-consumer
use case should typically be a regular stream.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add noopStreamGen and noopConsume moroutines + matching benches that
pit 1-direction stream (worker→parent) against 1-direction channel
(parent→worker) on equal footing — both are one cross-thread hop per
item.
Findings (on this branch, HEAD):
stream (worker generates → parent consumes) ~746K items/s
channel (parent generates → worker consumes) ~652K items/s
stream round-trip (parent → worker → parent) ~464K items/s
vs unoptimized (bb41d5d):
stream 1-dir: ~66K (+11.3x)
channel 1-dir: ~67K (+9.7x)
Two takeaways:
- The earlier "channel beats stream" observation was an artifact of
comparing channel's 1-hop case to stream's 2-hop (pass-through)
noopStream-with-input case. When measured 1-direction-vs-1-direction,
stream and channel are tied pre-optimization.
- Post-optimization, stream is ~15% faster than channel — that's the
win from atomics-based backpressure over message+adaptive-yield.
Applying atomics to the Distributor would likely close that gap.
Also keeps channelFanout at varying worker counts as a separate
section — shows the Distributor's single-threaded producer bottleneck
(peak at ~2 consumers, regresses past 4).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dogfood moroutine's own shared-memory primitives for the stream
backpressure path instead of reinventing the Atomics layer.
- Add `waitAsync(expected, timeoutMs?)` and `notify(count?)` methods
to Int32Atomic, wrapping `Atomics.waitAsync` / `Atomics.notify`.
Returns 'ok' | 'not-equal' | 'timed-out' from the wait. 5 new tests.
- Replace raw Int32Array + Atomics.* calls in pipe.ts with Int32Atomic
method calls. A PipeFlags struct holds two Int32Atomic views
(inflight at offset 0, state at offset 4) over a single 8-byte SAB.
- Expose newPipeFlags() / pipeFlagsFromBuffer() so callers only work
with typed wrappers; the raw SharedArrayBuffer is passed over the
dispatch message edge but both sides reconstruct PipeFlags from it.
Behavior unchanged — same throughput (~450K/s) and backpressure
tightness (~89ms steady latency with 5ms-per-item consumer).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the pause/resume message protocol on the worker-to-parent
stream path with a shared Int32Array of two slots: INFLIGHT and STATE.
Worker increments INFLIGHT after each emit and, on hitting HIGH_WATER,
parks in Atomics.waitAsync(flags, INFLIGHT, current). Parent decrements
INFLIGHT + Atomics.notify on each pull. Cancellation is STATE=CANCEL +
notify; worker checks STATE after each wake.
Eliminates:
- 'pause'/'resume' postMessage round-trip
- Adaptive setImmediate yield per-emit (no longer needed; worker parks
directly on the atomic when backpressured, runs at microtask speed
otherwise)
Only wired through dispatchStream + handleStreamTask. pipeToPort and
channel Distributor still use the message-based adaptive-yield path via
the opts.flags-less branch in pipeIterable.
Results (M-series mac, Node v24.15.0, noopStream, 100K items):
before after
stream throughput ~410K/s ~455K/s +11%
backpressure buffer (avg) 184 ms 89 ms ~2x tighter
backpressure buffer (max) 991 ms 96 ms ~10x tighter
backpressure steady-state still stable
growing at ~89ms
Max queued items drops from ~200 to ~18 (HIGH_WATER + 1-2 overshoot),
within a factor of 2 of the theoretical minimum.
Follow-up work: apply the same pattern to pipeToPort (parent->worker
iterable args) and channel Distributor (1:N fan-out).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One helper in src/pipe.ts now owns the pause/resume/close wiring,
value serialization + transferable collection, and the adaptive
yield state machine. Used by:
- worker-entry.ts handleStreamTask — worker-to-parent stream emission
- execute.ts pipeToPort — parent-to-worker iterable arg piping (now
gains adaptive yield it was previously missing, fixing a latent
starvation bug if the main thread was otherwise idle)
- channel.ts Distributor.tryPull kept separate (multi-consumer
work-stealing doesn't fit the 1:1 shape) but gets the same
adaptive-yield inline for consistency
Behavioral change: pipeToPort now yields adaptively. It previously
never yielded, which was fine when the main thread had other I/O but
could starve pause delivery in pure-CPU pipelines.
Surprising side effect: extracting the worker-entry emit loop into
its own function pushed streaming throughput from ~85K items/s to
~390K items/s for a purely-synchronous generator — same logic,
much better JIT optimization once the emit loop isn't inlined in
the large handleStreamTask async function. A 4.6x win on top of
the 2.1x the adaptive-yield change already gave.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two cleanups on worker-entry.ts:
1. Arg resolution now lives as a small loop at each call site (value
task, stream task, and recursive task-arg) instead of behind a
resolveArgs helper that returned unknown[] | Promise<unknown[]>.
A `needsAsyncResolve(arg)` helper DRYs up the
`MessagePort | task-arg` check. The hot-path (value task) keeps the
sync-prologue + async-tail structure inline so fully-sync args
don't enter async mode at all.
2. Streaming emit loop uses adaptive setImmediate instead of yielding
after every emit. Arms a setImmediate whose callback flips a
`ticked` flag; if the generator's own awaits naturally tick the
loop between emits the flag flips and we skip the forced yield,
otherwise a streak of YIELD_EVERY (=16) emits without a tick
triggers a forced yield so pause/close can reach us.
Net effect: pure-CPU generators jump from ~40K items/s to ~85K (the
fixed-yield-every-16 ceiling); I/O-backed generators should approach
the no-yield ceiling (~489K) because their awaits tick naturally.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous optimization cached the fn + added a sync fast-path for
resolveArgs, but still awaited them unconditionally — which costs a
microtask hop even when the callee returns synchronously.
This change threads "T | Promise<T>" through the hot path and uses
`instanceof Promise` checks to only await when there's actually
something to wait for:
- resolveFn returns a Fn directly when cached
- resolveArgs builds the result array imperatively, switching to an
async helper only at the first arg that needs async resolution
- invokeAndRespond checks whether the moroutine's return value is a
Promise before awaiting it
- the 'message' listener is no longer async; value-task handling runs
fully synchronously when nothing in the chain returns a Promise
Streaming tasks use their own path (handleStreamTask) which remains
async — they're inherently long-lived and the async overhead doesn't
dominate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two small savings on the worker's per-task hot path:
1. Cache the resolved moroutine function by id. Previously every
task did id.slice() + imported.has() + registry.get() every call;
now it's a single fnCache.get() after the first call per
moroutine. Applies to both handleTask and task-arg resolution.
2. When a task's args contain no MessagePort and no task-args,
skip Promise.all and synchronously map through deserializeArg.
Avoids allocating N promises + awaiting Promise.all for tasks
with primitive args (the common case).
Eliminates ~3 map/string ops and N promise allocations per call
after warmup. Small but measurable gain in dispatch throughput.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Measures pure round-trip dispatch overhead with a noop moroutine —
the task does essentially no work, so timings reflect the cost of
postMessage + event loop re-entry per task.
Shapes measured:
- Ping-pong latency (strict await-each, 1 worker)
- Pipelined throughput (1 worker, variable in-flight window)
- Parallel throughput (N workers, Promise.all)
- Batch arg vs per-task vs streaming (1 worker, 100K items)
The batch-vs-stream comparison is particularly informative: batching
100K items into a single task arg is ~34x faster than per-task
dispatch, while the current streaming implementation is ~8x slower
than per-task because of the per-item setImmediate yield used to
keep pause/resume backpressure responsive.
Used together with docs/atomics-bench/ (local-only research notes)
to evaluate Piscina's atomics technique — conclusion on the
atomics-dispatch branch; this bench stays useful regardless.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Published 1.0.0 threw "Cannot find module .../dist/worker-entry.ts"
because the worker entry URL was hardcoded as a string literal. tsc's
extension rewriting only handles import specifiers, so .ts leaked into
dist/*.js. Now derived from import.meta.url — src picks .ts, dist picks .js.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A custom Balancer routes tasks to workers by hashing a shard key
extracted from task args via isTask() narrowing. Demonstrates how
per-worker state (a Map in this demo) stays consistent when calls
for the same key are pinned to the same worker.
Output contrasts round-robin (counts split across workers, wrong
totals) with keyAffinity (counts consistent, correct totals).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Moroutines returned by mo() expose a readonly .id
- isTask(mo, task) narrows task to Task<T, A> inferred from the
moroutine's return type; useful when a pool handles tasks from
multiple moroutines and you want to recover the specific shape
- Task<T, A>.args is now typed as A (previously unknown[]), so
narrowing propagates through task.args — backward compatible
for Task<T> where A defaults to unknown[]
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- inert(task) returns a plain task descriptor without PromiseLike or
AsyncIterable protocols, safe to yield from an (async) generator
without triggering auto-await
- map(run, items, { concurrency, signal }) dispatches an iterable or
async iterable of tasks to a Runner with bounded concurrency, yielding
results in completion order
- Accepts mixed task types: Task<string> | Task<number> yields string | number
- Supports AbortSignal for stream cancellation; moroutine auto-transfers
signals passed as task args so in-flight work can observe the same abort
Includes test coverage, a bounded-map example that hashes a directory
tree via recursive async generator, and a README section.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task<T, A> now carries result and arg types via brand symbols, without
PromiseLike or AsyncIterable protocols. Live tasks returned by mo()
intersect with the appropriate protocol for awaiting/iteration.
Enables accurate result-type inference for helpers like map() that
consume inert Task values — the bare descriptor can be yielded from
a generator or held in arrays without triggering auto-await.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>