Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

1# moroutine 2 3> Offload functions to worker threads with shared memory primitives for Node.js. 4 5## Installation 6 7```sh 8npm install moroutine 9``` 10 11Requires Node.js v24+. 12 13## Quick Start 14 15```ts 16// main.ts 17import { isPrime } from './is-prime.ts'; 18 19const result = await isPrime(999_999_937); 20console.log(result); // true 21``` 22 23```ts 24// is-prime.ts 25import { mo } from 'moroutine'; 26 27export const isPrime = mo(import.meta, (n: number): boolean => { 28 if (n < 2) return false; 29 for (let i = 2; i * i <= n; i++) { 30 if (n % i === 0) return false; 31 } 32 return true; 33}); 34``` 35 36Define a function with `mo()` in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions. 37 38## Core API 39 40### `mo(import.meta, fn)` 41 42Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically). 43 44```ts 45// math.ts 46import { mo } from 'moroutine'; 47 48export const add = mo(import.meta, (a: number, b: number): number => { 49 return a + b; 50}); 51``` 52 53### `workers(size?, opts?)` 54 55Creates a pool of worker threads. Returns a `Runner` that dispatches tasks. Disposable via `using` or `[Symbol.dispose]()`. Defaults to `os.availableParallelism()` workers and round-robin scheduling when arguments are omitted. 56 57```ts 58import { workers } from 'moroutine'; 59import { add } from './math.ts'; 60 61{ 62 using run = workers(2); 63 64 const result = await run(add(3, 4)); // single task 65 const [a, b] = await run([add(1, 2), add(3, 4)]); // batch 66} 67``` 68 69#### Graceful Shutdown 70 71Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation. 72 73```ts 74{ 75 await using run = workers(4); 76 77 run(longTask(run.signal)); // task can react to abort 78 run(otherTask()); // runs to completion 79} 80// signal fired, waited for both tasks, then workers terminated 81``` 82 83Use `using` (without `await`) for immediate termination, same as before. 84 85A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long: 86 87```ts 88{ 89 await using run = workers(4, { shutdownTimeout: 5000 }); 90 // ... 91} 92``` 93 94#### Load Balancing 95 96The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy: 97 98```ts 99import { workers, leastBusy } from 'moroutine'; 100 101{ 102 using run = workers(4, { balance: leastBusy() }); 103 // tasks dispatched to whichever worker has the fewest in-flight tasks 104} 105``` 106 107Built-in balancers: 108 109- `roundRobin()` — cycles through workers in order (default) 110- `leastBusy()` — picks the worker with the lowest active task count 111 112Custom balancers implement the `Balancer` interface: 113 114```ts 115import type { Balancer, WorkerHandle, Task } from 'moroutine'; 116 117const random: Balancer = { 118 select(workers: readonly WorkerHandle[], task: Task) { 119 return workers[Math.floor(Math.random() * workers.length)]; 120 }, 121}; 122``` 123 124Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies. 125 126`isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded: 127 128```ts 129import { isTask, roundRobin } from 'moroutine'; 130import type { Balancer } from 'moroutine'; 131import { increment, read } from './counter.ts'; 132 133export function keyAffinity(): Balancer { 134 const fallback = roundRobin(); 135 return { 136 select(workers, task) { 137 let key: string | undefined; 138 if (isTask(increment, task)) key = task.args[0]; 139 else if (isTask(read, task)) key = task.args[0]; 140 if (key === undefined) return fallback.select(workers, task); 141 return workers[hash(key) % workers.length]; 142 }, 143 }; 144} 145``` 146 147Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing. 148 149### Dedicated Workers 150 151Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function. 152 153```ts 154const result = await add(3, 4); // runs on a dedicated worker for `add` 155``` 156 157### Task-Args 158 159Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection. 160 161```ts 162// db.ts 163import { DatabaseSync } from 'node:sqlite'; 164import { mo } from 'moroutine'; 165 166export const openDb = mo(import.meta, (filename: string): DatabaseSync => { 167 return new DatabaseSync(filename); 168}); 169 170export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => { 171 db.exec(sql); 172}); 173 174export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => { 175 return db.prepare(sql).all(); 176}); 177``` 178 179```ts 180import { workers } from 'moroutine'; 181import { openDb, exec, query } from './db.ts'; 182 183const db = openDb(':memory:'); 184 185{ 186 using run = workers(1); 187 await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)); 188 await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`)); 189 const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }] 190} 191``` 192 193`openDb()` returns a `Task<DatabaseSync>`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it. 194 195## Shared Memory 196 197### Descriptors and `shared()` 198 199Shared-memory types are created with descriptor functions or the `shared()` allocator. 200 201```ts 202import { shared, int32, bool, mutex, string, bytes } from 'moroutine'; 203``` 204 205#### Primitives 206 207```ts 208const counter = int32(); // standalone Int32 209const flag = bool(); // standalone Bool 210const big = int64(); // standalone Int64 (bigint) 211``` 212 213#### Atomics 214 215Atomic variants use `Atomics.*` for thread-safe operations without a lock. 216 217```ts 218const counter = int32atomic(); 219counter.add(1); // atomic increment, returns previous value 220counter.load(); // atomic read 221``` 222 223Full atomic operations: `load`, `store`, `add`, `sub`, `and`, `or`, `xor`, `exchange`, `compareExchange`. 224 225#### Structs 226 227Plain objects in `shared()` create structs backed by a single `SharedArrayBuffer`. 228 229```ts 230const point = shared({ x: int32, y: int32 }); 231 232point.load(); // { x: 0, y: 0 } 233point.store({ x: 10, y: 20 }); 234point.fields.x.store(10); // direct field access 235``` 236 237Structs nest: 238 239```ts 240const rect = shared({ 241 pos: { x: int32, y: int32 }, 242 size: { w: int32, h: int32 }, 243}); 244``` 245 246#### Tuples 247 248Arrays in `shared()` create fixed-length tuples. 249 250```ts 251const pair = shared([int32, bool]); 252pair.load(); // [0, false] 253pair.store([42, true]); 254pair.elements[0].store(99); 255``` 256 257#### Bytes and Strings 258 259```ts 260const buf = bytes(32); // fixed 32-byte buffer 261buf.store(new Uint8Array(32)); // exact length required 262buf.load(); // Readonly<Uint8Array> view 263buf.view[0] = 0xff; // direct mutable access 264 265const name = string(64); // UTF-8, max 64 bytes 266name.store('hello'); 267name.load(); // 'hello' 268``` 269 270#### Value Shorthand 271 272Primitive values in schemas infer their type. 273 274```ts 275shared(0); // Int32 initialized to 0 276shared(true); // Bool initialized to true 277shared(0n); // Int64 initialized to 0n 278shared({ x: 10, y: 20 }); // struct with Int32 fields 279``` 280 281### Locks 282 283#### Mutex 284 285```ts 286const mu = mutex(); 287 288using guard = await mu.lock(); 289// exclusive access 290// auto-unlocks when guard is disposed 291 292// or manually: 293await mu.lock(); 294mu.unlock(); 295``` 296 297#### RwLock 298 299```ts 300const rw = rwlock(); 301 302using guard = await rw.readLock(); // multiple readers OK 303using guard = await rw.writeLock(); // exclusive access 304``` 305 306### Using with Workers 307 308Shared-memory types pass through `postMessage` automatically. They're reconstructed on the worker side with the same shared backing memory. 309 310```ts 311// update-position.ts 312import { mo } from 'moroutine'; 313import type { Mutex, SharedStruct, Int32 } from 'moroutine'; 314 315type Position = SharedStruct<{ x: Int32; y: Int32 }>; 316 317export const updatePosition = mo( 318 import.meta, 319 async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => { 320 using guard = await mu.lock(); 321 const current = pos.load(); 322 pos.store({ x: current.x + dx, y: current.y + dy }); 323 }, 324); 325``` 326 327```ts 328// main.ts 329import { workers, shared, int32, mutex } from 'moroutine'; 330import { updatePosition } from './update-position.ts'; 331 332const mu = mutex(); 333const pos = shared({ x: int32, y: int32 }); 334 335{ 336 using run = workers(4); 337 await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]); 338} 339 340console.log(pos.load()); // { x: 1, y: 1 } 341``` 342 343## Streaming 344 345### Streaming Moroutines 346 347Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with pause/resume backpressure. 348 349```ts 350// count.ts 351import { mo } from 'moroutine'; 352 353export const countUp = mo(import.meta, async function* (n: number) { 354 for (let i = 0; i < n; i++) { 355 yield i; 356 } 357}); 358``` 359 360Iterate directly (dedicated worker) or dispatch via a pool: 361 362```ts 363import { workers } from 'moroutine'; 364import { countUp } from './count.ts'; 365 366// Dedicated worker 367for await (const n of countUp(5)) { 368 console.log(n); // 0, 1, 2, 3, 4 369} 370 371// Worker pool 372{ 373 using run = workers(2); 374 for await (const n of run(countUp(5))) { 375 console.log(n); // 0, 1, 2, 3, 4 376 } 377} 378``` 379 380### `channel()` and Fan-out 381 382When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing). 383 384```ts 385import { workers, channel, assign, mo } from 'moroutine'; 386 387const generate = mo(import.meta, async function* (n: number) { 388 for (let i = 0; i < n; i++) yield i; 389}); 390 391const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 392 const items: number[] = []; 393 for await (const n of input) items.push(n); 394 return items; 395}); 396``` 397 398```ts 399const ch = channel(generate(100)); 400 401{ 402 using run = workers(); 403 const fanout = run.workers.map((w) => { 404 return assign(w, process(ch)); 405 }); 406 const results = await run(fanout); 407 // Items distributed across workers — no duplicates, no gaps 408} 409``` 410 411Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker. 412 413Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. 414 415### `map()` — Bounded Fan-out 416 417Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited. 418 419```ts 420// main.ts 421import { readdir } from 'node:fs/promises'; 422import { join } from 'node:path'; 423import { workers, map, inert } from 'moroutine'; 424import type { Task } from 'moroutine'; 425import { hashFile, type FileHash } from './hash-file.ts'; 426 427{ 428 using run = workers(); 429 for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) { 430 console.log(`${hash.slice(0, 12)} ${path}`); 431 } 432} 433 434async function* walk(dir: string): AsyncGenerator<Task<FileHash>> { 435 for (const entry of await readdir(dir, { withFileTypes: true })) { 436 const p = join(dir, entry.name); 437 if (entry.isDirectory()) { 438 yield* walk(p); 439 } else { 440 yield inert(hashFile(p)); 441 } 442 } 443} 444``` 445 446```ts 447// hash-file.ts 448import { readFile } from 'node:fs/promises'; 449import { createHash } from 'node:crypto'; 450import { mo } from 'moroutine'; 451 452export type FileHash = { path: string; hash: string }; 453 454export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => { 455 const buf = await readFile(path); 456 return { path, hash: createHash('sha256').update(buf).digest('hex') }; 457}); 458``` 459 460`map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work. 461 462### Pipelines 463 464Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. 465 466```ts 467const doubled = double(generate(5)); 468const squared = square(doubled); 469for await (const n of squared) { 470 console.log(n); 471} 472``` 473 474## Transfers 475 476Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams. 477 478```ts 479import { transfer } from 'moroutine'; 480 481const buf = new ArrayBuffer(1024); 482await run(processData(transfer(buf))); 483// buf is now detached (zero-length) — ownership moved to worker 484``` 485 486Return values from workers are auto-transferred when possible. 487 488## Examples 489 490All examples require Node v24+ and can be run directly, e.g. `node examples/primes/main.ts`. 491 492- [`examples/primes`](examples/primes) -- CPU-bound prime checking on a dedicated worker 493- [`examples/non-blocking`](examples/non-blocking) -- main thread stays responsive during heavy computation 494- [`examples/parallel-batch`](examples/parallel-batch) -- sequential vs parallel batch processing 495- [`examples/atomics`](examples/atomics) -- shared atomic counter across workers 496- [`examples/shared-state`](examples/shared-state) -- mutex-protected shared struct 497- [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker 498- [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker 499- [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching 500- [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers 501- [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing 502- [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()` 503- [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks 504- [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key 505- [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers