# moroutine > Offload functions to worker threads with shared memory primitives for Node.js. ## Installation ```sh npm install moroutine ``` Requires Node.js v24+. ## Quick Start ```ts // main.ts import { isPrime } from './is-prime.ts'; const result = await isPrime(999_999_937); console.log(result); // true ``` ```ts // is-prime.ts import { mo } from 'moroutine'; export const isPrime = mo(import.meta, (n: number): boolean => { if (n < 2) return false; for (let i = 2; i * i <= n; i++) { if (n % i === 0) return false; } return true; }); ``` Define a function with `mo()` in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions. ## Core API ### `mo(import.meta, fn)` Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically). ```ts // math.ts import { mo } from 'moroutine'; export const add = mo(import.meta, (a: number, b: number): number => { return a + b; }); ``` ### `workers(size?, opts?)` Creates a pool of worker threads. Returns a `Runner` that dispatches tasks. Disposable via `using` or `[Symbol.dispose]()`. Defaults to `os.availableParallelism()` workers and round-robin scheduling when arguments are omitted. ```ts import { workers } from 'moroutine'; import { add } from './math.ts'; { using run = workers(2); const result = await run(add(3, 4)); // single task const [a, b] = await run([add(1, 2), add(3, 4)]); // batch } ``` #### Graceful Shutdown Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation. ```ts { await using run = workers(4); run(longTask(run.signal)); // task can react to abort run(otherTask()); // runs to completion } // signal fired, waited for both tasks, then workers terminated ``` Use `using` (without `await`) for immediate termination, same as before. A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long: ```ts { await using run = workers(4, { shutdownTimeout: 5000 }); // ... } ``` #### Load Balancing The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy: ```ts import { workers, leastBusy } from 'moroutine'; { using run = workers(4, { balance: leastBusy() }); // tasks dispatched to whichever worker has the fewest in-flight tasks } ``` Built-in balancers: - `roundRobin()` — cycles through workers in order (default) - `leastBusy()` — picks the worker with the lowest active task count Custom balancers implement the `Balancer` interface: ```ts import type { Balancer, WorkerHandle, Task } from 'moroutine'; const random: Balancer = { select(workers: readonly WorkerHandle[], task: Task) { return workers[Math.floor(Math.random() * workers.length)]; }, }; ``` Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies. `isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded: ```ts import { isTask, roundRobin } from 'moroutine'; import type { Balancer } from 'moroutine'; import { increment, read } from './counter.ts'; export function keyAffinity(): Balancer { const fallback = roundRobin(); return { select(workers, task) { let key: string | undefined; if (isTask(increment, task)) key = task.args[0]; else if (isTask(read, task)) key = task.args[0]; if (key === undefined) return fallback.select(workers, task); return workers[hash(key) % workers.length]; }, }; } ``` Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing. ### Dedicated Workers Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function. ```ts const result = await add(3, 4); // runs on a dedicated worker for `add` ``` ### Task-Args Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection. ```ts // db.ts import { DatabaseSync } from 'node:sqlite'; import { mo } from 'moroutine'; export const openDb = mo(import.meta, (filename: string): DatabaseSync => { return new DatabaseSync(filename); }); export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => { db.exec(sql); }); export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => { return db.prepare(sql).all(); }); ``` ```ts import { workers } from 'moroutine'; import { openDb, exec, query } from './db.ts'; const db = openDb(':memory:'); { using run = workers(1); await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)); await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`)); const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }] } ``` `openDb()` returns a `Task`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it. ## Shared Memory ### Descriptors and `shared()` Shared-memory types are created with descriptor functions or the `shared()` allocator. ```ts import { shared, int32, bool, mutex, string, bytes } from 'moroutine'; ``` #### Primitives ```ts const counter = int32(); // standalone Int32 const flag = bool(); // standalone Bool const big = int64(); // standalone Int64 (bigint) ``` #### Atomics Atomic variants use `Atomics.*` for thread-safe operations without a lock. ```ts const counter = int32atomic(); counter.add(1); // atomic increment, returns previous value counter.load(); // atomic read ``` Full atomic operations: `load`, `store`, `add`, `sub`, `and`, `or`, `xor`, `exchange`, `compareExchange`. `Int32Atomic` additionally exposes futex-style wait / wake: ```ts const slot = int32atomic(); // In one thread — wait until the slot's value differs from 0, // with an optional timeout. await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out' await slot.waitAsync(0, 100 /*ms*/); // with timeout // In another thread — change the value and wake waiters. slot.store(1); slot.notify(); // wakes all waiters on this slot slot.notify(2); // wakes at most 2 ``` `waitAsync` returns `'not-equal'` synchronously (no microtask hop) if the slot already holds a value different from `expected`. The underlying `Atomics.wait`/`notify` futex is available only on `Int32Atomic`. #### Structs Plain objects in `shared()` create structs backed by a single `SharedArrayBuffer`. ```ts const point = shared({ x: int32, y: int32 }); point.load(); // { x: 0, y: 0 } point.store({ x: 10, y: 20 }); point.fields.x.store(10); // direct field access ``` Structs nest: ```ts const rect = shared({ pos: { x: int32, y: int32 }, size: { w: int32, h: int32 }, }); ``` #### Tuples Arrays in `shared()` create fixed-length tuples. ```ts const pair = shared([int32, bool]); pair.load(); // [0, false] pair.store([42, true]); pair.elements[0].store(99); ``` #### Bytes and Strings ```ts const buf = bytes(32); // fixed 32-byte buffer buf.store(new Uint8Array(32)); // exact length required buf.load(); // Readonly view buf.view[0] = 0xff; // direct mutable access const name = string(64); // UTF-8, max 64 bytes name.store('hello'); name.load(); // 'hello' ``` #### Value Shorthand Primitive values in schemas infer their type. ```ts shared(0); // Int32 initialized to 0 shared(true); // Bool initialized to true shared(0n); // Int64 initialized to 0n shared({ x: 10, y: 20 }); // struct with Int32 fields ``` ### Locks #### Mutex ```ts const mu = mutex(); using guard = await mu.lock(); // exclusive access // auto-unlocks when guard is disposed // or manually: await mu.lock(); mu.unlock(); // Non-blocking attempt — returns null if held elsewhere. using guard = mu.tryLock(); if (!guard) return; ``` #### RwLock ```ts const rw = rwlock(); using guard = await rw.readLock(); // multiple readers OK using guard = await rw.writeLock(); // exclusive access // Non-blocking variants — return null if unavailable. using r = rw.tryReadLock(); // null if write-locked using w = rw.tryWriteLock(); // null if any lock is held ``` ### Using with Workers Shared-memory types pass through `postMessage` automatically. They're reconstructed on the worker side with the same shared backing memory. ```ts // update-position.ts import { mo } from 'moroutine'; import type { Mutex, SharedStruct, Int32 } from 'moroutine'; type Position = SharedStruct<{ x: Int32; y: Int32 }>; export const updatePosition = mo( import.meta, async (mu: Mutex, pos: Position, dx: number, dy: number): Promise => { using guard = await mu.lock(); const current = pos.load(); pos.store({ x: current.x + dx, y: current.y + dy }); }, ); ``` ```ts // main.ts import { workers, shared, int32, mutex } from 'moroutine'; import { updatePosition } from './update-position.ts'; const mu = mutex(); const pos = shared({ x: int32, y: int32 }); { using run = workers(4); await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]); } console.log(pos.load()); // { x: 1, y: 1 } ``` ## Streaming ### Streaming Moroutines Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with atomics-based backpressure — the producer parks when `highWaterMark` items (default 16) sit in flight and resumes when the consumer drains below the cap. ```ts // count.ts import { mo } from 'moroutine'; export const countUp = mo(import.meta, async function* (n: number) { for (let i = 0; i < n; i++) { yield i; } }); ``` Iterate directly (dedicated worker) or dispatch via a pool: ```ts import { workers } from 'moroutine'; import { countUp } from './count.ts'; // Dedicated worker for await (const n of countUp(5)) { console.log(n); // 0, 1, 2, 3, 4 } // Worker pool { using run = workers(2); for await (const n of run(countUp(5))) { console.log(n); // 0, 1, 2, 3, 4 } } ``` ### `channel()` and Fan-out When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing). ```ts import { workers, channel, assign, mo } from 'moroutine'; const generate = mo(import.meta, async function* (n: number) { for (let i = 0; i < n; i++) yield i; }); const process = mo(import.meta, async (input: AsyncIterable): Promise => { const items: number[] = []; for await (const n of input) items.push(n); return items; }); ``` ```ts const ch = channel(generate(100)); { using run = workers(); const fanout = run.workers.map((w) => { return assign(w, process(ch)); }); const results = await run(fanout); // Items distributed across workers — no duplicates, no gaps } ``` Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker. Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. ### `map()` — Bounded Fan-out Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited. ```ts // main.ts import { readdir } from 'node:fs/promises'; import { join } from 'node:path'; import { workers, map, inert } from 'moroutine'; import type { Task } from 'moroutine'; import { hashFile, type FileHash } from './hash-file.ts'; { using run = workers(); for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) { console.log(`${hash.slice(0, 12)} ${path}`); } } async function* walk(dir: string): AsyncGenerator> { for (const entry of await readdir(dir, { withFileTypes: true })) { const p = join(dir, entry.name); if (entry.isDirectory()) { yield* walk(p); } else { yield inert(hashFile(p)); } } } ``` ```ts // hash-file.ts import { readFile } from 'node:fs/promises'; import { createHash } from 'node:crypto'; import { mo } from 'moroutine'; export type FileHash = { path: string; hash: string }; export const hashFile = mo(import.meta, async (path: string): Promise => { const buf = await readFile(path); return { path, hash: createHash('sha256').update(buf).digest('hex') }; }); ``` `map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task | Task` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work. ### Pipelines Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. ```ts const doubled = double(generate(5)); const squared = square(doubled); for await (const n of squared) { console.log(n); } ``` ## Transfers Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams. ```ts import { transfer } from 'moroutine'; const buf = new ArrayBuffer(1024); await run(processData(transfer(buf))); // buf is now detached (zero-length) — ownership moved to worker ``` Return values from workers are auto-transferred when possible. ## Examples All examples require Node v24+ and can be run directly, e.g. `node examples/primes/main.ts`. - [`examples/primes`](examples/primes) -- CPU-bound prime checking on a dedicated worker - [`examples/non-blocking`](examples/non-blocking) -- main thread stays responsive during heavy computation - [`examples/parallel-batch`](examples/parallel-batch) -- sequential vs parallel batch processing - [`examples/atomics`](examples/atomics) -- shared atomic counter across workers - [`examples/shared-state`](examples/shared-state) -- mutex-protected shared struct - [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker - [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker - [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching - [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers - [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing - [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()` - [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks - [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key - [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers