Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

TypeScript 100.0%
Other 0.1%
170 1 8

Clone this repository

https://tangled.org/divy.zone/moroutine https://tangled.org/did:plc:l3rouwludahu3ui3bt66mfvj/moroutine
git@tangled.org:divy.zone/moroutine git@tangled.org:did:plc:l3rouwludahu3ui3bt66mfvj/moroutine

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

moroutine#

Offload functions to worker threads with shared memory primitives for Node.js.

Installation#

npm install moroutine

Requires Node.js v24+.

Quick Start#

// main.ts
import { isPrime } from './is-prime.ts';

const result = await isPrime(999_999_937);
console.log(result); // true
// is-prime.ts
import { mo } from 'moroutine';

export const isPrime = mo(import.meta, (n: number): boolean => {
  if (n < 2) return false;
  for (let i = 2; i * i <= n; i++) {
    if (n % i === 0) return false;
  }
  return true;
});

Define a function with mo() in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions.

Core API#

mo(import.meta, fn)#

Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically).

// math.ts
import { mo } from 'moroutine';

export const add = mo(import.meta, (a: number, b: number): number => {
  return a + b;
});

workers(size?, opts?)#

Creates a pool of worker threads. Returns a Runner that dispatches tasks. Disposable via using or [Symbol.dispose](). Defaults to os.availableParallelism() workers and round-robin scheduling when arguments are omitted.

import { workers } from 'moroutine';
import { add } from './math.ts';

{
  using run = workers(2);

  const result = await run(add(3, 4)); // single task
  const [a, b] = await run([add(1, 2), add(3, 4)]); // batch
}

Graceful Shutdown#

Use await using for graceful async shutdown. The pool exposes a signal that fires when disposal begins — thread it into tasks for cooperative cancellation.

{
  await using run = workers(4);

  run(longTask(run.signal)); // task can react to abort
  run(otherTask()); // runs to completion
}
// signal fired, waited for both tasks, then workers terminated

Use using (without await) for immediate termination, same as before.

A shutdownTimeout option force-terminates workers if graceful shutdown takes too long:

{
  await using run = workers(4, { shutdownTimeout: 5000 });
  // ...
}

Load Balancing#

The pool uses round-robin scheduling by default. Pass a balance option to change the strategy:

import { workers, leastBusy } from 'moroutine';

{
  using run = workers(4, { balance: leastBusy() });
  // tasks dispatched to whichever worker has the fewest in-flight tasks
}

Built-in balancers:

  • roundRobin() — cycles through workers in order (default)
  • leastBusy() — picks the worker with the lowest active task count

Custom balancers implement the Balancer interface:

import type { Balancer, WorkerHandle, Task } from 'moroutine';

const random: Balancer = {
  select(workers: readonly WorkerHandle[], task: Task) {
    return workers[Math.floor(Math.random() * workers.length)];
  },
};

Each WorkerHandle exposes activeCount (in-flight tasks) and thread (the underlying worker_threads.Worker) for building custom strategies.

isTask(moroutine, task) narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded:

import { isTask, roundRobin } from 'moroutine';
import type { Balancer } from 'moroutine';
import { increment, read } from './counter.ts';

export function keyAffinity(): Balancer {
  const fallback = roundRobin();
  return {
    select(workers, task) {
      let key: string | undefined;
      if (isTask(increment, task)) key = task.args[0];
      else if (isTask(read, task)) key = task.args[0];
      if (key === undefined) return fallback.select(workers, task);
      return workers[hash(key) % workers.length];
    },
  };
}

Inside the isTask branch, task.args is typed as the moroutine's argument tuple (e.g. [key: string, n: number] for increment). See examples/worker-affinity for the full demo, including per-worker state that only stays consistent under affinity routing.

Dedicated Workers#

Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function.

const result = await add(3, 4); // runs on a dedicated worker for `add`

Task-Args#

Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection.

// db.ts
import { DatabaseSync } from 'node:sqlite';
import { mo } from 'moroutine';

export const openDb = mo(import.meta, (filename: string): DatabaseSync => {
  return new DatabaseSync(filename);
});

export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => {
  db.exec(sql);
});

export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => {
  return db.prepare(sql).all();
});
import { workers } from 'moroutine';
import { openDb, exec, query } from './db.ts';

const db = openDb(':memory:');

{
  using run = workers(1);
  await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`));
  await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`));
  const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }]
}

openDb() returns a Task<DatabaseSync>, and exec()/query() accept it in place of DatabaseSync. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it.

Shared Memory#

Descriptors and shared()#

Shared-memory types are created with descriptor functions or the shared() allocator.

import { shared, int32, bool, mutex, string, bytes } from 'moroutine';

Primitives#

const counter = int32(); // standalone Int32
const flag = bool(); // standalone Bool
const big = int64(); // standalone Int64 (bigint)

Atomics#

Atomic variants use Atomics.* for thread-safe operations without a lock.

const counter = int32atomic();
counter.add(1); // atomic increment, returns previous value
counter.load(); // atomic read

Full atomic operations: load, store, add, sub, and, or, xor, exchange, compareExchange.

Int32Atomic additionally exposes futex-style wait / wake:

const slot = int32atomic();

// In one thread — wait until the slot's value differs from 0,
// with an optional timeout.
await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out'
await slot.waitAsync(0, 100 /*ms*/); // with timeout

// In another thread — change the value and wake waiters.
slot.store(1);
slot.notify(); // wakes all waiters on this slot
slot.notify(2); // wakes at most 2

waitAsync returns 'not-equal' synchronously (no microtask hop) if the slot already holds a value different from expected. The underlying Atomics.wait/notify futex is available only on Int32Atomic.

Structs#

Plain objects in shared() create structs backed by a single SharedArrayBuffer.

const point = shared({ x: int32, y: int32 });

point.load(); // { x: 0, y: 0 }
point.store({ x: 10, y: 20 });
point.fields.x.store(10); // direct field access

Structs nest:

const rect = shared({
  pos: { x: int32, y: int32 },
  size: { w: int32, h: int32 },
});

Tuples#

Arrays in shared() create fixed-length tuples.

const pair = shared([int32, bool]);
pair.load(); // [0, false]
pair.store([42, true]);
pair.elements[0].store(99);

Bytes and Strings#

const buf = bytes(32); // fixed 32-byte buffer
buf.store(new Uint8Array(32)); // exact length required
buf.load(); // Readonly<Uint8Array> view
buf.view[0] = 0xff; // direct mutable access

const name = string(64); // UTF-8, max 64 bytes
name.store('hello');
name.load(); // 'hello'

Value Shorthand#

Primitive values in schemas infer their type.

shared(0); // Int32 initialized to 0
shared(true); // Bool initialized to true
shared(0n); // Int64 initialized to 0n
shared({ x: 10, y: 20 }); // struct with Int32 fields

Locks#

Mutex#

const mu = mutex();

using guard = await mu.lock();
// exclusive access
// auto-unlocks when guard is disposed

// or manually:
await mu.lock();
mu.unlock();

// Non-blocking attempt — returns null if held elsewhere.
using guard = mu.tryLock();
if (!guard) return;

RwLock#

const rw = rwlock();

using guard = await rw.readLock(); // multiple readers OK
using guard = await rw.writeLock(); // exclusive access

// Non-blocking variants — return null if unavailable.
using r = rw.tryReadLock(); // null if write-locked
using w = rw.tryWriteLock(); // null if any lock is held

Using with Workers#

Shared-memory types pass through postMessage automatically. They're reconstructed on the worker side with the same shared backing memory.

// update-position.ts
import { mo } from 'moroutine';
import type { Mutex, SharedStruct, Int32 } from 'moroutine';

type Position = SharedStruct<{ x: Int32; y: Int32 }>;

export const updatePosition = mo(
  import.meta,
  async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => {
    using guard = await mu.lock();
    const current = pos.load();
    pos.store({ x: current.x + dx, y: current.y + dy });
  },
);
// main.ts
import { workers, shared, int32, mutex } from 'moroutine';
import { updatePosition } from './update-position.ts';

const mu = mutex();
const pos = shared({ x: int32, y: int32 });

{
  using run = workers(4);
  await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]);
}

console.log(pos.load()); // { x: 1, y: 1 }

Streaming#

Streaming Moroutines#

Wrap an async function* with mo() to create a streaming moroutine. Values are streamed between threads via MessageChannel with atomics-based backpressure — the producer parks when highWaterMark items (default 16) sit in flight and resumes when the consumer drains below the cap.

// count.ts
import { mo } from 'moroutine';

export const countUp = mo(import.meta, async function* (n: number) {
  for (let i = 0; i < n; i++) {
    yield i;
  }
});

Iterate directly (dedicated worker) or dispatch via a pool:

import { workers } from 'moroutine';
import { countUp } from './count.ts';

// Dedicated worker
for await (const n of countUp(5)) {
  console.log(n); // 0, 1, 2, 3, 4
}

// Worker pool
{
  using run = workers(2);
  for await (const n of run(countUp(5))) {
    console.log(n); // 0, 1, 2, 3, 4
  }
}

channel() and Fan-out#

When you pass the same AsyncIterable or streaming task argument to multiple tasks, each task gets its own copy of the data. Use channel() to share a single source across multiple workers — each item goes to exactly one consumer (work stealing).

import { workers, channel, assign, mo } from 'moroutine';

const generate = mo(import.meta, async function* (n: number) {
  for (let i = 0; i < n; i++) yield i;
});

const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => {
  const items: number[] = [];
  for await (const n of input) items.push(n);
  return items;
});
const ch = channel(generate(100));

{
  using run = workers();
  const fanout = run.workers.map((w) => {
    return assign(w, process(ch));
  });
  const results = await run(fanout);
  // Items distributed across workers — no duplicates, no gaps
}

Use assign(worker, task) to pin a task to a specific worker. run.workers is a read-only array of worker handles, one per pool worker.

Without channel(), AsyncIterable and streaming task arguments are auto-detected and streamed to a single consumer. channel() is only needed for fan-out.

map() — Bounded Fan-out#

Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with inert() so it passes through the stream as-is instead of being auto-awaited.

// main.ts
import { readdir } from 'node:fs/promises';
import { join } from 'node:path';
import { workers, map, inert } from 'moroutine';
import type { Task } from 'moroutine';
import { hashFile, type FileHash } from './hash-file.ts';

{
  using run = workers();
  for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) {
    console.log(`${hash.slice(0, 12)}  ${path}`);
  }
}

async function* walk(dir: string): AsyncGenerator<Task<FileHash>> {
  for (const entry of await readdir(dir, { withFileTypes: true })) {
    const p = join(dir, entry.name);
    if (entry.isDirectory()) {
      yield* walk(p);
    } else {
      yield inert(hashFile(p));
    }
  }
}
// hash-file.ts
import { readFile } from 'node:fs/promises';
import { createHash } from 'node:crypto';
import { mo } from 'moroutine';

export type FileHash = { path: string; hash: string };

export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => {
  const buf = await readFile(path);
  return { path, hash: createHash('sha256').update(buf).digest('hex') };
});

map() accepts a sync iterable, async iterable, or generator of tasks. concurrency caps in-flight dispatches (default 1). Mixed task types unify: map over Task<string> | Task<number> yields string | number. An optional signal aborts iteration — and, since moroutine auto-transfers AbortSignal args, the same signal passed to tasks will also cancel in-flight work.

Pipelines#

Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker.

const doubled = double(generate(5));
const squared = square(doubled);
for await (const n of squared) {
  console.log(n);
}

Transfers#

Use transfer() for zero-copy movement of ArrayBuffer, TypedArray, MessagePort, or streams.

import { transfer } from 'moroutine';

const buf = new ArrayBuffer(1024);
await run(processData(transfer(buf)));
// buf is now detached (zero-length) — ownership moved to worker

Return values from workers are auto-transferred when possible.

Examples#

All examples require Node v24+ and can be run directly, e.g. node examples/primes/main.ts.