Offload functions to worker threads with shared memory primitives for Node.js.
1# moroutine
2
3> Offload functions to worker threads with shared memory primitives for Node.js.
4
5## Installation
6
7```sh
8npm install moroutine
9```
10
11Requires Node.js v24+.
12
13## Quick Start
14
15```ts
16// main.ts
17import { isPrime } from './is-prime.ts';
18
19const result = await isPrime(999_999_937);
20console.log(result); // true
21```
22
23```ts
24// is-prime.ts
25import { mo } from 'moroutine';
26
27export const isPrime = mo(import.meta, (n: number): boolean => {
28 if (n < 2) return false;
29 for (let i = 2; i * i <= n; i++) {
30 if (n % i === 0) return false;
31 }
32 return true;
33});
34```
35
36Define a function with `mo()` in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions.
37
38## Core API
39
40### `mo(import.meta, fn)`
41
42Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically).
43
44```ts
45// math.ts
46import { mo } from 'moroutine';
47
48export const add = mo(import.meta, (a: number, b: number): number => {
49 return a + b;
50});
51```
52
53### `workers(size?, opts?)`
54
55Creates a pool of worker threads. Returns a `Runner` that dispatches tasks. Disposable via `using` or `[Symbol.dispose]()`. Defaults to `os.availableParallelism()` workers and round-robin scheduling when arguments are omitted.
56
57```ts
58import { workers } from 'moroutine';
59import { add } from './math.ts';
60
61{
62 using run = workers(2);
63
64 const result = await run(add(3, 4)); // single task
65 const [a, b] = await run([add(1, 2), add(3, 4)]); // batch
66}
67```
68
69#### Graceful Shutdown
70
71Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation.
72
73```ts
74{
75 await using run = workers(4);
76
77 run(longTask(run.signal)); // task can react to abort
78 run(otherTask()); // runs to completion
79}
80// signal fired, waited for both tasks, then workers terminated
81```
82
83Use `using` (without `await`) for immediate termination, same as before.
84
85A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long:
86
87```ts
88{
89 await using run = workers(4, { shutdownTimeout: 5000 });
90 // ...
91}
92```
93
94#### Load Balancing
95
96The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy:
97
98```ts
99import { workers, leastBusy } from 'moroutine';
100
101{
102 using run = workers(4, { balance: leastBusy() });
103 // tasks dispatched to whichever worker has the fewest in-flight tasks
104}
105```
106
107Built-in balancers:
108
109- `roundRobin()` — cycles through workers in order (default)
110- `leastBusy()` — picks the worker with the lowest active task count
111
112Custom balancers implement the `Balancer` interface:
113
114```ts
115import type { Balancer, WorkerHandle, Task } from 'moroutine';
116
117const random: Balancer = {
118 select(workers: readonly WorkerHandle[], task: Task) {
119 return workers[Math.floor(Math.random() * workers.length)];
120 },
121};
122```
123
124Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies.
125
126`isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded:
127
128```ts
129import { isTask, roundRobin } from 'moroutine';
130import type { Balancer } from 'moroutine';
131import { increment, read } from './counter.ts';
132
133export function keyAffinity(): Balancer {
134 const fallback = roundRobin();
135 return {
136 select(workers, task) {
137 let key: string | undefined;
138 if (isTask(increment, task)) key = task.args[0];
139 else if (isTask(read, task)) key = task.args[0];
140 if (key === undefined) return fallback.select(workers, task);
141 return workers[hash(key) % workers.length];
142 },
143 };
144}
145```
146
147Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing.
148
149### Dedicated Workers
150
151Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function.
152
153```ts
154const result = await add(3, 4); // runs on a dedicated worker for `add`
155```
156
157### Task-Args
158
159Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection.
160
161```ts
162// db.ts
163import { DatabaseSync } from 'node:sqlite';
164import { mo } from 'moroutine';
165
166export const openDb = mo(import.meta, (filename: string): DatabaseSync => {
167 return new DatabaseSync(filename);
168});
169
170export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => {
171 db.exec(sql);
172});
173
174export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => {
175 return db.prepare(sql).all();
176});
177```
178
179```ts
180import { workers } from 'moroutine';
181import { openDb, exec, query } from './db.ts';
182
183const db = openDb(':memory:');
184
185{
186 using run = workers(1);
187 await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`));
188 await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`));
189 const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }]
190}
191```
192
193`openDb()` returns a `Task<DatabaseSync>`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it.
194
195## Shared Memory
196
197### Descriptors and `shared()`
198
199Shared-memory types are created with descriptor functions or the `shared()` allocator.
200
201```ts
202import { shared, int32, bool, mutex, string, bytes } from 'moroutine';
203```
204
205#### Primitives
206
207```ts
208const counter = int32(); // standalone Int32
209const flag = bool(); // standalone Bool
210const big = int64(); // standalone Int64 (bigint)
211```
212
213#### Atomics
214
215Atomic variants use `Atomics.*` for thread-safe operations without a lock.
216
217```ts
218const counter = int32atomic();
219counter.add(1); // atomic increment, returns previous value
220counter.load(); // atomic read
221```
222
223Full atomic operations: `load`, `store`, `add`, `sub`, `and`, `or`, `xor`, `exchange`, `compareExchange`.
224
225`Int32Atomic` additionally exposes futex-style wait / wake:
226
227```ts
228const slot = int32atomic();
229
230// In one thread — wait until the slot's value differs from 0,
231// with an optional timeout.
232await slot.waitAsync(0); // 'ok' | 'not-equal' | 'timed-out'
233await slot.waitAsync(0, 100 /*ms*/); // with timeout
234
235// In another thread — change the value and wake waiters.
236slot.store(1);
237slot.notify(); // wakes all waiters on this slot
238slot.notify(2); // wakes at most 2
239```
240
241`waitAsync` returns `'not-equal'` synchronously (no microtask hop) if the slot already holds a value different from `expected`. The underlying `Atomics.wait`/`notify` futex is available only on `Int32Atomic`.
242
243#### Structs
244
245Plain objects in `shared()` create structs backed by a single `SharedArrayBuffer`.
246
247```ts
248const point = shared({ x: int32, y: int32 });
249
250point.load(); // { x: 0, y: 0 }
251point.store({ x: 10, y: 20 });
252point.fields.x.store(10); // direct field access
253```
254
255Structs nest:
256
257```ts
258const rect = shared({
259 pos: { x: int32, y: int32 },
260 size: { w: int32, h: int32 },
261});
262```
263
264#### Tuples
265
266Arrays in `shared()` create fixed-length tuples.
267
268```ts
269const pair = shared([int32, bool]);
270pair.load(); // [0, false]
271pair.store([42, true]);
272pair.elements[0].store(99);
273```
274
275#### Bytes and Strings
276
277```ts
278const buf = bytes(32); // fixed 32-byte buffer
279buf.store(new Uint8Array(32)); // exact length required
280buf.load(); // Readonly<Uint8Array> view
281buf.view[0] = 0xff; // direct mutable access
282
283const name = string(64); // UTF-8, max 64 bytes
284name.store('hello');
285name.load(); // 'hello'
286```
287
288#### Value Shorthand
289
290Primitive values in schemas infer their type.
291
292```ts
293shared(0); // Int32 initialized to 0
294shared(true); // Bool initialized to true
295shared(0n); // Int64 initialized to 0n
296shared({ x: 10, y: 20 }); // struct with Int32 fields
297```
298
299### Locks
300
301#### Mutex
302
303```ts
304const mu = mutex();
305
306using guard = await mu.lock();
307// exclusive access
308// auto-unlocks when guard is disposed
309
310// or manually:
311await mu.lock();
312mu.unlock();
313
314// Non-blocking attempt — returns null if held elsewhere.
315using guard = mu.tryLock();
316if (!guard) return;
317```
318
319#### RwLock
320
321```ts
322const rw = rwlock();
323
324using guard = await rw.readLock(); // multiple readers OK
325using guard = await rw.writeLock(); // exclusive access
326
327// Non-blocking variants — return null if unavailable.
328using r = rw.tryReadLock(); // null if write-locked
329using w = rw.tryWriteLock(); // null if any lock is held
330```
331
332### Using with Workers
333
334Shared-memory types pass through `postMessage` automatically. They're reconstructed on the worker side with the same shared backing memory.
335
336```ts
337// update-position.ts
338import { mo } from 'moroutine';
339import type { Mutex, SharedStruct, Int32 } from 'moroutine';
340
341type Position = SharedStruct<{ x: Int32; y: Int32 }>;
342
343export const updatePosition = mo(
344 import.meta,
345 async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => {
346 using guard = await mu.lock();
347 const current = pos.load();
348 pos.store({ x: current.x + dx, y: current.y + dy });
349 },
350);
351```
352
353```ts
354// main.ts
355import { workers, shared, int32, mutex } from 'moroutine';
356import { updatePosition } from './update-position.ts';
357
358const mu = mutex();
359const pos = shared({ x: int32, y: int32 });
360
361{
362 using run = workers(4);
363 await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]);
364}
365
366console.log(pos.load()); // { x: 1, y: 1 }
367```
368
369## Streaming
370
371### Streaming Moroutines
372
373Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with atomics-based backpressure — the producer parks when `highWaterMark` items (default 16) sit in flight and resumes when the consumer drains below the cap.
374
375```ts
376// count.ts
377import { mo } from 'moroutine';
378
379export const countUp = mo(import.meta, async function* (n: number) {
380 for (let i = 0; i < n; i++) {
381 yield i;
382 }
383});
384```
385
386Iterate directly (dedicated worker) or dispatch via a pool:
387
388```ts
389import { workers } from 'moroutine';
390import { countUp } from './count.ts';
391
392// Dedicated worker
393for await (const n of countUp(5)) {
394 console.log(n); // 0, 1, 2, 3, 4
395}
396
397// Worker pool
398{
399 using run = workers(2);
400 for await (const n of run(countUp(5))) {
401 console.log(n); // 0, 1, 2, 3, 4
402 }
403}
404```
405
406### `channel()` and Fan-out
407
408When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing).
409
410```ts
411import { workers, channel, assign, mo } from 'moroutine';
412
413const generate = mo(import.meta, async function* (n: number) {
414 for (let i = 0; i < n; i++) yield i;
415});
416
417const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => {
418 const items: number[] = [];
419 for await (const n of input) items.push(n);
420 return items;
421});
422```
423
424```ts
425const ch = channel(generate(100));
426
427{
428 using run = workers();
429 const fanout = run.workers.map((w) => {
430 return assign(w, process(ch));
431 });
432 const results = await run(fanout);
433 // Items distributed across workers — no duplicates, no gaps
434}
435```
436
437Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker.
438
439Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out.
440
441### `map()` — Bounded Fan-out
442
443Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited.
444
445```ts
446// main.ts
447import { readdir } from 'node:fs/promises';
448import { join } from 'node:path';
449import { workers, map, inert } from 'moroutine';
450import type { Task } from 'moroutine';
451import { hashFile, type FileHash } from './hash-file.ts';
452
453{
454 using run = workers();
455 for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) {
456 console.log(`${hash.slice(0, 12)} ${path}`);
457 }
458}
459
460async function* walk(dir: string): AsyncGenerator<Task<FileHash>> {
461 for (const entry of await readdir(dir, { withFileTypes: true })) {
462 const p = join(dir, entry.name);
463 if (entry.isDirectory()) {
464 yield* walk(p);
465 } else {
466 yield inert(hashFile(p));
467 }
468 }
469}
470```
471
472```ts
473// hash-file.ts
474import { readFile } from 'node:fs/promises';
475import { createHash } from 'node:crypto';
476import { mo } from 'moroutine';
477
478export type FileHash = { path: string; hash: string };
479
480export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => {
481 const buf = await readFile(path);
482 return { path, hash: createHash('sha256').update(buf).digest('hex') };
483});
484```
485
486`map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work.
487
488### Pipelines
489
490Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker.
491
492```ts
493const doubled = double(generate(5));
494const squared = square(doubled);
495for await (const n of squared) {
496 console.log(n);
497}
498```
499
500## Transfers
501
502Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams.
503
504```ts
505import { transfer } from 'moroutine';
506
507const buf = new ArrayBuffer(1024);
508await run(processData(transfer(buf)));
509// buf is now detached (zero-length) — ownership moved to worker
510```
511
512Return values from workers are auto-transferred when possible.
513
514## Examples
515
516All examples require Node v24+ and can be run directly, e.g. `node examples/primes/main.ts`.
517
518- [`examples/primes`](examples/primes) -- CPU-bound prime checking on a dedicated worker
519- [`examples/non-blocking`](examples/non-blocking) -- main thread stays responsive during heavy computation
520- [`examples/parallel-batch`](examples/parallel-batch) -- sequential vs parallel batch processing
521- [`examples/atomics`](examples/atomics) -- shared atomic counter across workers
522- [`examples/shared-state`](examples/shared-state) -- mutex-protected shared struct
523- [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker
524- [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker
525- [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching
526- [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers
527- [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing
528- [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()`
529- [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks
530- [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key
531- [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers