Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 531 lines 16 kB view raw view rendered
1# moroutine 2 3> Offload functions to worker threads with shared memory primitives for Node.js. 4 5## Installation 6 7```sh 8npm install moroutine 9``` 10 11Requires Node.js v24+. 12 13## Quick Start 14 15```ts 16// main.ts 17import { isPrime } from './is-prime.ts'; 18 19const result = await isPrime(999_999_937); 20console.log(result); // true 21``` 22 23```ts 24// is-prime.ts 25import { mo } from 'moroutine'; 26 27export const isPrime = mo(import.meta, (n: number): boolean => { 28 if (n < 2) return false; 29 for (let i = 2; i * i <= n; i++) { 30 if (n % i === 0) return false; 31 } 32 return true; 33}); 34``` 35 36Define a function with `mo()` in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions. 37 38## Core API 39 40### `mo(import.meta, fn)` 41 42Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically). 43 44```ts 45// math.ts 46import { mo } from 'moroutine'; 47 48export const add = mo(import.meta, (a: number, b: number): number => { 49 return a + b; 50}); 51``` 52 53### `workers(size?, opts?)` 54 55Creates a pool of worker threads. Returns a `Runner` that dispatches tasks. Disposable via `using` or `[Symbol.dispose]()`. Defaults to `os.availableParallelism()` workers and round-robin scheduling when arguments are omitted. 56 57```ts 58import { workers } from 'moroutine'; 59import { add } from './math.ts'; 60 61{ 62 using run = workers(2); 63 64 const result = await run(add(3, 4)); // single task 65 const [a, b] = await run([add(1, 2), add(3, 4)]); // batch 66} 67``` 68 69#### Graceful Shutdown 70 71Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation. 72 73```ts 74{ 75 await using run = workers(4); 76 77 run(longTask(run.signal)); // task can react to abort 78 run(otherTask()); // runs to completion 79} 80// signal fired, waited for both tasks, then workers terminated 81``` 82 83Use `using` (without `await`) for immediate termination, same as before. 84 85A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long: 86 87```ts 88{ 89 await using run = workers(4, { shutdownTimeout: 5000 }); 90 // ... 91} 92``` 93 94#### Load Balancing 95 96The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy: 97 98```ts 99import { workers, leastBusy } from 'moroutine'; 100 101{ 102 using run = workers(4, { balance: leastBusy() }); 103 // tasks dispatched to whichever worker has the fewest in-flight tasks 104} 105``` 106 107Built-in balancers: 108 109- `roundRobin()` — cycles through workers in order (default) 110- `leastBusy()` — picks the worker with the lowest active task count 111 112Custom balancers implement the `Balancer` interface: 113 114```ts 115import type { Balancer, WorkerHandle, Task } from 'moroutine'; 116 117const random: Balancer = { 118 select(workers: readonly WorkerHandle[], task: Task) { 119 return workers[Math.floor(Math.random() * workers.length)]; 120 }, 121}; 122``` 123 124Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies. 125 126`isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded: 127 128```ts 129import { isTask, roundRobin } from 'moroutine'; 130import type { Balancer } from 'moroutine'; 131import { increment, read } from './counter.ts'; 132 133export function keyAffinity(): Balancer { 134 const fallback = roundRobin(); 135 return { 136 select(workers, task) { 137 let key: string | undefined; 138 if (isTask(increment, task)) key = task.args[0]; 139 else if (isTask(read, task)) key = task.args[0]; 140 if (key === undefined) return fallback.select(workers, task); 141 return workers[hash(key) % workers.length]; 142 }, 143 }; 144} 145``` 146 147Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing. 148 149### Dedicated Workers 150 151Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function. 152 153```ts 154const result = await add(3, 4); // runs on a dedicated worker for `add` 155``` 156 157### Task-Args 158 159Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection. 160 161```ts 162// db.ts 163import { DatabaseSync } from 'node:sqlite'; 164import { mo } from 'moroutine'; 165 166export const openDb = mo(import.meta, (filename: string): DatabaseSync => { 167 return new DatabaseSync(filename); 168}); 169 170export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => { 171 db.exec(sql); 172}); 173 174export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => { 175 return db.prepare(sql).all(); 176}); 177``` 178 179```ts 180import { workers } from 'moroutine'; 181import { openDb, exec, query } from './db.ts'; 182 183const db = openDb(':memory:'); 184 185{ 186 using run = workers(1); 187 await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)); 188 await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`)); 189 const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }] 190} 191``` 192 193`openDb()` returns a `Task<DatabaseSync>`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it. 194 195## Shared Memory 196 197### Descriptors and `shared()` 198 199Shared-memory types are created with descriptor functions or the `shared()` allocator. 200 201```ts 202import { shared, int32, bool, mutex, string, bytes } from 'moroutine'; 203``` 204 205#### Primitives 206 207```ts 208const counter = int32(); // standalone Int32 209const flag = bool(); // standalone Bool 210const big = int64(); // standalone Int64 (bigint) 211``` 212 213#### Atomics 214 215Atomic variants use `Atomics.*` for thread-safe operations without a lock. 216 217```ts 218const counter = int32atomic(); 219counter.add(1); // atomic increment, returns previous value 220counter.load(); // atomic read 221``` 222 223Full atomic operations: `load`, `store`, `add`, `sub`, `and`, `or`, `xor`, `exchange`, `compareExchange`. 224 225`Int32Atomic` additionally exposes futex-style wait / wake: 226 227```ts 228const slot = int32atomic(); 229 230// In one thread — wait until the slot's value differs from 0, 231// with an optional timeout. 232await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out' 233await slot.waitAsync(0, 100 /*ms*/); // with timeout 234 235// In another thread — change the value and wake waiters. 236slot.store(1); 237slot.notify(); // wakes all waiters on this slot 238slot.notify(2); // wakes at most 2 239``` 240 241`waitAsync` returns `'not-equal'` synchronously (no microtask hop) if the slot already holds a value different from `expected`. The underlying `Atomics.wait`/`notify` futex is available only on `Int32Atomic`. 242 243#### Structs 244 245Plain objects in `shared()` create structs backed by a single `SharedArrayBuffer`. 246 247```ts 248const point = shared({ x: int32, y: int32 }); 249 250point.load(); // { x: 0, y: 0 } 251point.store({ x: 10, y: 20 }); 252point.fields.x.store(10); // direct field access 253``` 254 255Structs nest: 256 257```ts 258const rect = shared({ 259 pos: { x: int32, y: int32 }, 260 size: { w: int32, h: int32 }, 261}); 262``` 263 264#### Tuples 265 266Arrays in `shared()` create fixed-length tuples. 267 268```ts 269const pair = shared([int32, bool]); 270pair.load(); // [0, false] 271pair.store([42, true]); 272pair.elements[0].store(99); 273``` 274 275#### Bytes and Strings 276 277```ts 278const buf = bytes(32); // fixed 32-byte buffer 279buf.store(new Uint8Array(32)); // exact length required 280buf.load(); // Readonly<Uint8Array> view 281buf.view[0] = 0xff; // direct mutable access 282 283const name = string(64); // UTF-8, max 64 bytes 284name.store('hello'); 285name.load(); // 'hello' 286``` 287 288#### Value Shorthand 289 290Primitive values in schemas infer their type. 291 292```ts 293shared(0); // Int32 initialized to 0 294shared(true); // Bool initialized to true 295shared(0n); // Int64 initialized to 0n 296shared({ x: 10, y: 20 }); // struct with Int32 fields 297``` 298 299### Locks 300 301#### Mutex 302 303```ts 304const mu = mutex(); 305 306using guard = await mu.lock(); 307// exclusive access 308// auto-unlocks when guard is disposed 309 310// or manually: 311await mu.lock(); 312mu.unlock(); 313 314// Non-blocking attempt — returns null if held elsewhere. 315using guard = mu.tryLock(); 316if (!guard) return; 317``` 318 319#### RwLock 320 321```ts 322const rw = rwlock(); 323 324using guard = await rw.readLock(); // multiple readers OK 325using guard = await rw.writeLock(); // exclusive access 326 327// Non-blocking variants — return null if unavailable. 328using r = rw.tryReadLock(); // null if write-locked 329using w = rw.tryWriteLock(); // null if any lock is held 330``` 331 332### Using with Workers 333 334Shared-memory types pass through `postMessage` automatically. They're reconstructed on the worker side with the same shared backing memory. 335 336```ts 337// update-position.ts 338import { mo } from 'moroutine'; 339import type { Mutex, SharedStruct, Int32 } from 'moroutine'; 340 341type Position = SharedStruct<{ x: Int32; y: Int32 }>; 342 343export const updatePosition = mo( 344 import.meta, 345 async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => { 346 using guard = await mu.lock(); 347 const current = pos.load(); 348 pos.store({ x: current.x + dx, y: current.y + dy }); 349 }, 350); 351``` 352 353```ts 354// main.ts 355import { workers, shared, int32, mutex } from 'moroutine'; 356import { updatePosition } from './update-position.ts'; 357 358const mu = mutex(); 359const pos = shared({ x: int32, y: int32 }); 360 361{ 362 using run = workers(4); 363 await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]); 364} 365 366console.log(pos.load()); // { x: 1, y: 1 } 367``` 368 369## Streaming 370 371### Streaming Moroutines 372 373Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with atomics-based backpressure — the producer parks when `highWaterMark` items (default 16) sit in flight and resumes when the consumer drains below the cap. 374 375```ts 376// count.ts 377import { mo } from 'moroutine'; 378 379export const countUp = mo(import.meta, async function* (n: number) { 380 for (let i = 0; i < n; i++) { 381 yield i; 382 } 383}); 384``` 385 386Iterate directly (dedicated worker) or dispatch via a pool: 387 388```ts 389import { workers } from 'moroutine'; 390import { countUp } from './count.ts'; 391 392// Dedicated worker 393for await (const n of countUp(5)) { 394 console.log(n); // 0, 1, 2, 3, 4 395} 396 397// Worker pool 398{ 399 using run = workers(2); 400 for await (const n of run(countUp(5))) { 401 console.log(n); // 0, 1, 2, 3, 4 402 } 403} 404``` 405 406### `channel()` and Fan-out 407 408When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing). 409 410```ts 411import { workers, channel, assign, mo } from 'moroutine'; 412 413const generate = mo(import.meta, async function* (n: number) { 414 for (let i = 0; i < n; i++) yield i; 415}); 416 417const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 418 const items: number[] = []; 419 for await (const n of input) items.push(n); 420 return items; 421}); 422``` 423 424```ts 425const ch = channel(generate(100)); 426 427{ 428 using run = workers(); 429 const fanout = run.workers.map((w) => { 430 return assign(w, process(ch)); 431 }); 432 const results = await run(fanout); 433 // Items distributed across workers — no duplicates, no gaps 434} 435``` 436 437Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker. 438 439Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. 440 441### `map()` — Bounded Fan-out 442 443Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited. 444 445```ts 446// main.ts 447import { readdir } from 'node:fs/promises'; 448import { join } from 'node:path'; 449import { workers, map, inert } from 'moroutine'; 450import type { Task } from 'moroutine'; 451import { hashFile, type FileHash } from './hash-file.ts'; 452 453{ 454 using run = workers(); 455 for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) { 456 console.log(`${hash.slice(0, 12)} ${path}`); 457 } 458} 459 460async function* walk(dir: string): AsyncGenerator<Task<FileHash>> { 461 for (const entry of await readdir(dir, { withFileTypes: true })) { 462 const p = join(dir, entry.name); 463 if (entry.isDirectory()) { 464 yield* walk(p); 465 } else { 466 yield inert(hashFile(p)); 467 } 468 } 469} 470``` 471 472```ts 473// hash-file.ts 474import { readFile } from 'node:fs/promises'; 475import { createHash } from 'node:crypto'; 476import { mo } from 'moroutine'; 477 478export type FileHash = { path: string; hash: string }; 479 480export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => { 481 const buf = await readFile(path); 482 return { path, hash: createHash('sha256').update(buf).digest('hex') }; 483}); 484``` 485 486`map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work. 487 488### Pipelines 489 490Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. 491 492```ts 493const doubled = double(generate(5)); 494const squared = square(doubled); 495for await (const n of squared) { 496 console.log(n); 497} 498``` 499 500## Transfers 501 502Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams. 503 504```ts 505import { transfer } from 'moroutine'; 506 507const buf = new ArrayBuffer(1024); 508await run(processData(transfer(buf))); 509// buf is now detached (zero-length) — ownership moved to worker 510``` 511 512Return values from workers are auto-transferred when possible. 513 514## Examples 515 516All examples require Node v24+ and can be run directly, e.g. `node examples/primes/main.ts`. 517 518- [`examples/primes`](examples/primes) -- CPU-bound prime checking on a dedicated worker 519- [`examples/non-blocking`](examples/non-blocking) -- main thread stays responsive during heavy computation 520- [`examples/parallel-batch`](examples/parallel-batch) -- sequential vs parallel batch processing 521- [`examples/atomics`](examples/atomics) -- shared atomic counter across workers 522- [`examples/shared-state`](examples/shared-state) -- mutex-protected shared struct 523- [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker 524- [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker 525- [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching 526- [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers 527- [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing 528- [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()` 529- [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks 530- [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key 531- [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers