Offload functions to worker threads with shared memory primitives for Node.js.
1# moroutine
2
3> Offload functions to worker threads with shared memory primitives for Node.js.
4
5## Installation
6
7```sh
8npm install moroutine
9```
10
11Requires Node.js v24+.
12
13## Quick Start
14
15```ts
16// main.ts
17import { isPrime } from './is-prime.ts';
18
19const result = await isPrime(999_999_937);
20console.log(result); // true
21```
22
23```ts
24// is-prime.ts
25import { mo } from 'moroutine';
26
27export const isPrime = mo(import.meta, (n: number): boolean => {
28 if (n < 2) return false;
29 for (let i = 2; i * i <= n; i++) {
30 if (n % i === 0) return false;
31 }
32 return true;
33});
34```
35
36Define a function with `mo()` in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions.
37
38## Core API
39
40### `mo(import.meta, fn)`
41
42Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically).
43
44```ts
45// math.ts
46import { mo } from 'moroutine';
47
48export const add = mo(import.meta, (a: number, b: number): number => {
49 return a + b;
50});
51```
52
53### `workers(size?, opts?)`
54
55Creates a pool of worker threads. Returns a `Runner` that dispatches tasks. Disposable via `using` or `[Symbol.dispose]()`. Defaults to `os.availableParallelism()` workers and round-robin scheduling when arguments are omitted.
56
57```ts
58import { workers } from 'moroutine';
59import { add } from './math.ts';
60
61{
62 using run = workers(2);
63
64 const result = await run(add(3, 4)); // single task
65 const [a, b] = await run([add(1, 2), add(3, 4)]); // batch
66}
67```
68
69#### Graceful Shutdown
70
71Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation.
72
73```ts
74{
75 await using run = workers(4);
76
77 run(longTask(run.signal)); // task can react to abort
78 run(otherTask()); // runs to completion
79}
80// signal fired, waited for both tasks, then workers terminated
81```
82
83Use `using` (without `await`) for immediate termination, same as before.
84
85A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long:
86
87```ts
88{
89 await using run = workers(4, { shutdownTimeout: 5000 });
90 // ...
91}
92```
93
94#### Load Balancing
95
96The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy:
97
98```ts
99import { workers, leastBusy } from 'moroutine';
100
101{
102 using run = workers(4, { balance: leastBusy() });
103 // tasks dispatched to whichever worker has the fewest in-flight tasks
104}
105```
106
107Built-in balancers:
108
109- `roundRobin()` — cycles through workers in order (default)
110- `leastBusy()` — picks the worker with the lowest active task count
111
112Custom balancers implement the `Balancer` interface:
113
114```ts
115import type { Balancer, WorkerHandle, Task } from 'moroutine';
116
117const random: Balancer = {
118 select(workers: readonly WorkerHandle[], task: Task) {
119 return workers[Math.floor(Math.random() * workers.length)];
120 },
121};
122```
123
124Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies.
125
126`isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded:
127
128```ts
129import { isTask, roundRobin } from 'moroutine';
130import type { Balancer } from 'moroutine';
131import { increment, read } from './counter.ts';
132
133export function keyAffinity(): Balancer {
134 const fallback = roundRobin();
135 return {
136 select(workers, task) {
137 let key: string | undefined;
138 if (isTask(increment, task)) key = task.args[0];
139 else if (isTask(read, task)) key = task.args[0];
140 if (key === undefined) return fallback.select(workers, task);
141 return workers[hash(key) % workers.length];
142 },
143 };
144}
145```
146
147Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing.
148
149### Dedicated Workers
150
151Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function.
152
153```ts
154const result = await add(3, 4); // runs on a dedicated worker for `add`
155```
156
157### Task-Args
158
159Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection.
160
161```ts
162// db.ts
163import { DatabaseSync } from 'node:sqlite';
164import { mo } from 'moroutine';
165
166export const openDb = mo(import.meta, (filename: string): DatabaseSync => {
167 return new DatabaseSync(filename);
168});
169
170export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => {
171 db.exec(sql);
172});
173
174export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => {
175 return db.prepare(sql).all();
176});
177```
178
179```ts
180import { workers } from 'moroutine';
181import { openDb, exec, query } from './db.ts';
182
183const db = openDb(':memory:');
184
185{
186 using run = workers(1);
187 await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`));
188 await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`));
189 const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }]
190}
191```
192
193`openDb()` returns a `Task<DatabaseSync>`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it.
194
195## Shared Memory
196
197### Descriptors and `shared()`
198
199Shared-memory types are created with descriptor functions or the `shared()` allocator.
200
201```ts
202import { shared, int32, bool, mutex, string, bytes } from 'moroutine';
203```
204
205#### Primitives
206
207```ts
208const counter = int32(); // standalone Int32
209const flag = bool(); // standalone Bool
210const big = int64(); // standalone Int64 (bigint)
211```
212
213#### Atomics
214
215Atomic variants use `Atomics.*` for thread-safe operations without a lock.
216
217```ts
218const counter = int32atomic();
219counter.add(1); // atomic increment, returns previous value
220counter.load(); // atomic read
221```
222
223Full atomic operations: `load`, `store`, `add`, `sub`, `and`, `or`, `xor`, `exchange`, `compareExchange`.
224
225#### Structs
226
227Plain objects in `shared()` create structs backed by a single `SharedArrayBuffer`.
228
229```ts
230const point = shared({ x: int32, y: int32 });
231
232point.load(); // { x: 0, y: 0 }
233point.store({ x: 10, y: 20 });
234point.fields.x.store(10); // direct field access
235```
236
237Structs nest:
238
239```ts
240const rect = shared({
241 pos: { x: int32, y: int32 },
242 size: { w: int32, h: int32 },
243});
244```
245
246#### Tuples
247
248Arrays in `shared()` create fixed-length tuples.
249
250```ts
251const pair = shared([int32, bool]);
252pair.load(); // [0, false]
253pair.store([42, true]);
254pair.elements[0].store(99);
255```
256
257#### Bytes and Strings
258
259```ts
260const buf = bytes(32); // fixed 32-byte buffer
261buf.store(new Uint8Array(32)); // exact length required
262buf.load(); // Readonly<Uint8Array> view
263buf.view[0] = 0xff; // direct mutable access
264
265const name = string(64); // UTF-8, max 64 bytes
266name.store('hello');
267name.load(); // 'hello'
268```
269
270#### Value Shorthand
271
272Primitive values in schemas infer their type.
273
274```ts
275shared(0); // Int32 initialized to 0
276shared(true); // Bool initialized to true
277shared(0n); // Int64 initialized to 0n
278shared({ x: 10, y: 20 }); // struct with Int32 fields
279```
280
281### Locks
282
283#### Mutex
284
285```ts
286const mu = mutex();
287
288using guard = await mu.lock();
289// exclusive access
290// auto-unlocks when guard is disposed
291
292// or manually:
293await mu.lock();
294mu.unlock();
295```
296
297#### RwLock
298
299```ts
300const rw = rwlock();
301
302using guard = await rw.readLock(); // multiple readers OK
303using guard = await rw.writeLock(); // exclusive access
304```
305
306### Using with Workers
307
308Shared-memory types pass through `postMessage` automatically. They're reconstructed on the worker side with the same shared backing memory.
309
310```ts
311// update-position.ts
312import { mo } from 'moroutine';
313import type { Mutex, SharedStruct, Int32 } from 'moroutine';
314
315type Position = SharedStruct<{ x: Int32; y: Int32 }>;
316
317export const updatePosition = mo(
318 import.meta,
319 async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => {
320 using guard = await mu.lock();
321 const current = pos.load();
322 pos.store({ x: current.x + dx, y: current.y + dy });
323 },
324);
325```
326
327```ts
328// main.ts
329import { workers, shared, int32, mutex } from 'moroutine';
330import { updatePosition } from './update-position.ts';
331
332const mu = mutex();
333const pos = shared({ x: int32, y: int32 });
334
335{
336 using run = workers(4);
337 await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]);
338}
339
340console.log(pos.load()); // { x: 1, y: 1 }
341```
342
343## Streaming
344
345### Streaming Moroutines
346
347Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with pause/resume backpressure.
348
349```ts
350// count.ts
351import { mo } from 'moroutine';
352
353export const countUp = mo(import.meta, async function* (n: number) {
354 for (let i = 0; i < n; i++) {
355 yield i;
356 }
357});
358```
359
360Iterate directly (dedicated worker) or dispatch via a pool:
361
362```ts
363import { workers } from 'moroutine';
364import { countUp } from './count.ts';
365
366// Dedicated worker
367for await (const n of countUp(5)) {
368 console.log(n); // 0, 1, 2, 3, 4
369}
370
371// Worker pool
372{
373 using run = workers(2);
374 for await (const n of run(countUp(5))) {
375 console.log(n); // 0, 1, 2, 3, 4
376 }
377}
378```
379
380### `channel()` and Fan-out
381
382When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing).
383
384```ts
385import { workers, channel, assign, mo } from 'moroutine';
386
387const generate = mo(import.meta, async function* (n: number) {
388 for (let i = 0; i < n; i++) yield i;
389});
390
391const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => {
392 const items: number[] = [];
393 for await (const n of input) items.push(n);
394 return items;
395});
396```
397
398```ts
399const ch = channel(generate(100));
400
401{
402 using run = workers();
403 const fanout = run.workers.map((w) => {
404 return assign(w, process(ch));
405 });
406 const results = await run(fanout);
407 // Items distributed across workers — no duplicates, no gaps
408}
409```
410
411Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker.
412
413Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out.
414
415### `map()` — Bounded Fan-out
416
417Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited.
418
419```ts
420// main.ts
421import { readdir } from 'node:fs/promises';
422import { join } from 'node:path';
423import { workers, map, inert } from 'moroutine';
424import type { Task } from 'moroutine';
425import { hashFile, type FileHash } from './hash-file.ts';
426
427{
428 using run = workers();
429 for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) {
430 console.log(`${hash.slice(0, 12)} ${path}`);
431 }
432}
433
434async function* walk(dir: string): AsyncGenerator<Task<FileHash>> {
435 for (const entry of await readdir(dir, { withFileTypes: true })) {
436 const p = join(dir, entry.name);
437 if (entry.isDirectory()) {
438 yield* walk(p);
439 } else {
440 yield inert(hashFile(p));
441 }
442 }
443}
444```
445
446```ts
447// hash-file.ts
448import { readFile } from 'node:fs/promises';
449import { createHash } from 'node:crypto';
450import { mo } from 'moroutine';
451
452export type FileHash = { path: string; hash: string };
453
454export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => {
455 const buf = await readFile(path);
456 return { path, hash: createHash('sha256').update(buf).digest('hex') };
457});
458```
459
460`map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work.
461
462### Pipelines
463
464Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker.
465
466```ts
467const doubled = double(generate(5));
468const squared = square(doubled);
469for await (const n of squared) {
470 console.log(n);
471}
472```
473
474## Transfers
475
476Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams.
477
478```ts
479import { transfer } from 'moroutine';
480
481const buf = new ArrayBuffer(1024);
482await run(processData(transfer(buf)));
483// buf is now detached (zero-length) — ownership moved to worker
484```
485
486Return values from workers are auto-transferred when possible.
487
488## Examples
489
490All examples require Node v24+ and can be run directly, e.g. `node examples/primes/main.ts`.
491
492- [`examples/primes`](examples/primes) -- CPU-bound prime checking on a dedicated worker
493- [`examples/non-blocking`](examples/non-blocking) -- main thread stays responsive during heavy computation
494- [`examples/parallel-batch`](examples/parallel-batch) -- sequential vs parallel batch processing
495- [`examples/atomics`](examples/atomics) -- shared atomic counter across workers
496- [`examples/shared-state`](examples/shared-state) -- mutex-protected shared struct
497- [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker
498- [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker
499- [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching
500- [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers
501- [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing
502- [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()`
503- [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks
504- [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key
505- [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers