moroutine#
1.3.0#
Minor Changes#
-
c9a1794: Synchronous
try*variants for shared locksNew methods on
MutexandRwLock:// Non-blocking, single-attempt acquisition. // Returns a disposable guard on success, `null` on contention. mutex.tryLock(); // MutexGuard | null rwlock.tryReadLock(); // ReadGuard | null rwlock.tryWriteLock(); // WriteGuard | nullEach makes a single atomic CAS attempt — no waiting, no retry. Composes with
usingsincenullskips dispose registration:using guard = mu.tryLock(); if (!guard) return; // held elsewhere, nothing to dispose // ...critical section; auto-unlock on scope exit
1.2.2#
Patch Changes#
- cdd5c83: Fix task-arg roundtrip of
SharedStructwith aTuple-typed field (andTuplewith aTupleelement). The recursiveserializeStructFieldhelper was missing aTuplebranch, so a tuple nested inside another shared container would serialize as{ __shared__: 'Tuple' }with noelements, and worker-side deserialization crashed withCannot read properties of undefined (reading 'map').serializeStructFieldnow mirrors the top-levelserializeArgfor tuples, recursing into elements. Struct-in-struct, tuple-alone, and tuple-of-structs were already handled; struct-with-tuple-field, tuple-of-tuples, and deeper combinations now work too.
1.2.1#
Patch Changes#
-
b9b3498:
channel()fan-out now rotates consumer selection round-robin instead of always preferring the lowest-index consumer. Previously, when several consumers were belowhighWaterMark, the distributor would pickconsumers[0]every time, skewing work toward the first worker. The scan now starts from a rotating cursor, so ties distribute evenly.Skip-based RR semantics are unchanged — consumers at cap are still skipped so a saturated worker doesn't stall the pipeline. Fairness is best at higher volumes (≥10K items) and under any real backpressure, where initial warm-up asymmetry washes out. See
examples/benchmark-dispatch/fanout.tsfor measured distributions.
1.2.0#
Minor Changes#
-
24741da: Atomics-based backpressure for streams and channels; new
Int32Atomic#waitAsync/#notifyNew public API on
Int32Atomic:const slot = int32atomic(); // Park until the slot holds a value other than `expected`, with optional timeout. await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out' await slot.waitAsync(0, 100 /*ms*/); // Wake waiters on this slot. slot.notify(); // wake all slot.notify(2); // wake at most 2Matches
Atomics.waitAsync/Atomics.notifysemantics; returns'not-equal'synchronously (no microtask hop) if the slot already holds something different fromexpected.Streams and channels use atomics for backpressure: streaming tasks and
channel()no longer rely onpause/resumeport messages for flow control. Internally, aSharedArrayBuffertracks per-endpointinflightandstate; the producer parks onAtomics.waitAsyncwhen thehighWaterMarkcap is reached and resumes when the consumer drains below it. Behavior-equivalent and much tighter under backpressure (worst-case buffering drops from ~10-20×highWaterMarktohighWaterMark + 1-2).highWaterMarkoption now honored for bothrun(streamTask, { highWaterMark })andchannel(src, { highWaterMark }). Previously read and discarded in the streaming path.Significant throughput improvements on streaming and channel fan-out:
- Worker → parent stream: ~66K items/s → ~750K items/s (11×)
- Round-trip stream: ~39K items/s → ~425K items/s (11×)
- Channel fan-out at 8 consumers: ~412K items/s → ~546K items/s (+33%)
- Per-task dispatch: ~289K → ~328K (+14%)
No changes to user-facing API shape beyond the
Int32Atomicadditions.
1.1.0#
Minor Changes#
-
25ea1c1: Add
inert()andmap()helpers for fan-out over a worker poolinert(task)returns a plain task descriptor withoutPromiseLikeorAsyncIterableprotocols — safe to yield from an (async) generator without triggering auto-awaitmap(run, items, { concurrency, signal })dispatches an iterable or async iterable of tasks to aRunnerwith bounded concurrency, yielding results in completion order; accepts mixed task types (Task<string> | Task<number>→string | number)- New example:
examples/bounded-map— recursive directory walk hashing every file with bounded concurrency Task<T, A>now carries a type-only arg brand to enable accurate result inference throughmap(); live tasks returned bymo()continue to bePromiseLike<T>/AsyncIterable<T>as before
-
4ef12db: Add
isTask()type guard for narrowing tasks to a specific moroutineif (isTask(isPrime, task)) { // task: Task<boolean, [n: number]> const [n] = task.args; }- Moroutines returned by
mo()now expose a readonlyidfor stable identity isTask(mo, task)returnstruewhentaskwas produced bymo, and narrows the task to the descriptor type produced by that moroutineTask<T, A>.argsis now typed asA(previouslyunknown[]) so narrowing propagates to argument access — unchanged forTask<T>without a specialized arg tuple
- Moroutines returned by
Patch Changes#
- e2cebd5: Fix
Cannot find module .../dist/worker-entry.tswhen consuming the published build. The worker entry URL was hardcoded to./worker-entry.tsas a string literal, so tsc's extension rewriting (which only handles import specifiers) left it unchanged indist/*.js. Now derived fromimport.meta.urlso src uses.tsand dist uses.js.
1.0.0#
Major Changes#
- e46aca0: Unify Task and StreamTask under a single
Task<T>typeTask<T>isPromiseLike<T>for value tasks,AsyncIterable<T>for streaming tasks, or just the base dispatch shape when unparameterizedBalancer.select()receivesTaskinstead ofTask<any> | StreamTask<any>Arg<T>simplifies toT | Task<T>- Classes renamed to
PromiseLikeTask<T>andAsyncIterableTask<T>(internal, preferTask<T>in type annotations)
Minor Changes#
-
9ab6016: Graceful async worker pool shutdown
await using run = workers(4)waits for in-flight tasks to settle before terminating workersrun.signalis anAbortSignalthat fires when the pool starts disposing — thread it into tasks for cooperative cancellationworkers(size, { shutdownTimeout: ms })force-terminates workers if graceful shutdown exceeds the timeout- Existing
using run(sync dispose) still terminates immediately
-
3612d52: Include main-thread call site in error stack traces
Errors thrown during task dispatch now have stack traces that show where
run(),exec(), orawait taskwas called. The original worker-side error is preserved aserr.causewith its own stack pointing to the moroutine source.Error: boom at trackValue (worker-pool.ts:52:15) at async loadUser (user-code.ts:7:3) at async main (user-code.ts:11:3) { [cause]: Error: boom at fixtures/math.ts:6:9 // original throw site on the worker at MessagePort.<anonymous> (worker-entry.ts:173:25) }Built-in error subclass identity (
TypeError,RangeError, etc.) is preserved on the outer wrapper. -
5137201: Preserve error details across worker boundary
Errors thrown in moroutines now transfer with
message,stack,cause, and built-in subclass identity (TypeError,RangeError, etc.) preserved via structured clone. Previously only the message string was kept. -
d13cd40: Configurable load balancing for worker pools
workers(size, { balance: leastBusy() })routes tasks to the worker with the fewest in-flight tasksroundRobin()cycles through workers in order (default, same as before)- Custom balancers implement
Balancer.select(workers, task)for full control over scheduling
-
cdccfdb: Per-worker dispatch with
assign()andrun.workersrun.workersexposes a read-only array ofWorkerHandles, one per pool workerassign(worker, task)returns a copy of the task pinned to a specific workerworker.exec(task)dispatches directly to a specific worker- Channel fan-out no longer requires knowing the worker count
Patch Changes#
-
e2dabc1: Fix pool workers exiting early with top-level await
Pool workers are now ref'd for the lifetime of the pool, preventing the Node.js event loop from exiting prematurely when using
using run = workers()with top-levelawait. Dedicated workers continue to ref only while tasks are in-flight.
0.1.1#
Patch Changes#
-
5068c96: Auto-detect and transfer AbortSignal arguments to workers
AbortSignal args are automatically detected, marked transferable via
util.transferableAbortSignal(), and transferred to the worker. Works with regular tasks, streaming moroutines, and dedicated workers.
0.1.0#
Minor Changes#
- b6d4275: Initial version of moroutine: offload functions to worker threads with shared memory primitives for Node.js.