Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: reorganize README sections, complete examples list

- Move Server Threads and Streaming above Shared Memory
- Move Transfers beneath Pipelines (before Shared Memory)
- Move Graceful Shutdown and Load Balancing to their own sections
- Add server-threads and benchmark-dispatch to examples list
- Change -- to - in examples list

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+264 -262
+264 -262
README.md
··· 66 66 } 67 67 ``` 68 68 69 - #### Graceful Shutdown 69 + ### Dedicated Workers 70 70 71 - Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation. 71 + Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function. 72 72 73 73 ```ts 74 - { 75 - await using run = workers(4); 76 - 77 - run(longTask(run.signal)); // task can react to abort 78 - run(otherTask()); // runs to completion 79 - } 80 - // signal fired, waited for both tasks, then workers terminated 74 + const result = await add(3, 4); // runs on a dedicated worker for `add` 81 75 ``` 82 76 83 - Use `using` (without `await`) for immediate termination, same as before. 77 + ### Task-Args 84 78 85 - A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long: 79 + Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection. 86 80 87 81 ```ts 88 - { 89 - await using run = workers(4, { shutdownTimeout: 5000 }); 90 - // ... 91 - } 92 - ``` 82 + // db.ts 83 + import { DatabaseSync } from 'node:sqlite'; 84 + import { mo } from 'moroutine'; 93 85 94 - #### Load Balancing 86 + export const openDb = mo(import.meta, (filename: string): DatabaseSync => { 87 + return new DatabaseSync(filename); 88 + }); 95 89 96 - The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy: 90 + export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => { 91 + db.exec(sql); 92 + }); 93 + 94 + export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => { 95 + return db.prepare(sql).all(); 96 + }); 97 + ``` 97 98 98 99 ```ts 99 - import { workers, leastBusy } from 'moroutine'; 100 + import { workers } from 'moroutine'; 101 + import { openDb, exec, query } from './db.ts'; 102 + 103 + const db = openDb(':memory:'); 100 104 101 105 { 102 - using run = workers(4, { balance: leastBusy() }); 103 - // tasks dispatched to whichever worker has the fewest in-flight tasks 106 + using run = workers(1); 107 + await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)); 108 + await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`)); 109 + const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }] 104 110 } 105 111 ``` 106 112 107 - Built-in balancers: 113 + `openDb()` returns a `Task<DatabaseSync>`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it. 108 114 109 - - `roundRobin()` — cycles through workers in order (default) 110 - - `leastBusy()` — picks the worker with the lowest active task count 115 + ## Server Threads 111 116 112 - Custom balancers implement the `Balancer` interface: 117 + Scale a Node HTTP server across worker threads by fanning out socket fds. Main accepts TCP connections; workers each run their own `http.Server` and handle requests. Imported from the `moroutine/serve` subpath. 113 118 114 119 ```ts 115 - import type { Balancer, WorkerHandle, Task } from 'moroutine'; 120 + // app-server.ts 121 + import { createServer } from 'node:http'; 122 + import { mo } from 'moroutine'; 123 + import { listen, type ListenArgs } from 'moroutine/serve'; 116 124 117 - const random: Balancer = { 118 - select(workers: readonly WorkerHandle[], task: Task) { 119 - return workers[Math.floor(Math.random() * workers.length)]; 120 - }, 121 - }; 125 + export const runServer = mo(import.meta, async (...args: ListenArgs): Promise<void> => { 126 + const srv = createServer((req, res) => { 127 + res.writeHead(200); 128 + res.end('hello'); 129 + }); 130 + await listen(srv, ...args); 131 + }); 122 132 ``` 123 133 124 - Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies. 134 + ```ts 135 + // main.ts 136 + import { once } from 'node:events'; 137 + import { createServer } from 'node:net'; 138 + import { workers, assign } from 'moroutine'; 139 + import { serverThreads } from 'moroutine/serve'; 140 + import { runServer } from './app-server.ts'; 125 141 126 - `isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded: 142 + const server = createServer(); 127 143 128 - ```ts 129 - import { isTask, roundRobin } from 'moroutine'; 130 - import type { Balancer } from 'moroutine'; 131 - import { increment, read } from './counter.ts'; 144 + { 145 + await using run = workers(4); 146 + using threads = serverThreads(run.workers, server); 147 + const fanout = threads.map(([w, args]) => assign(w, runServer(...args))); 148 + void run(fanout); 132 149 133 - export function keyAffinity(): Balancer { 134 - const fallback = roundRobin(); 135 - return { 136 - select(workers, task) { 137 - let key: string | undefined; 138 - if (isTask(increment, task)) key = task.args[0]; 139 - else if (isTask(read, task)) key = task.args[0]; 140 - if (key === undefined) return fallback.select(workers, task); 141 - return workers[hash(key) % workers.length]; 142 - }, 143 - }; 150 + await once(server.listen(3000), 'listening'); 151 + console.log('Listening on :3000'); 152 + await once(process, 'SIGINT'); 153 + server.close(); 144 154 } 145 155 ``` 146 156 147 - Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing. 157 + Connection routing defaults to `leastConns()`; pass `{ balance: roundRobin() }` for simple fan-out. TLS termination runs on workers (`https.createServer` / `tls.createServer` inside your moroutine); the main-thread listener stays raw TCP. POSIX-only (Linux, macOS). Frameworks that expose an underlying `http.Server` compose without glue: 158 + 159 + ```ts 160 + // express 161 + const srv = createServer(express()); 162 + await listen(srv, ...args); 148 163 149 - ### Dedicated Workers 164 + // fastify 165 + const app = fastify(); 166 + await app.ready(); 167 + await listen(app.server, ...args); 168 + ``` 150 169 151 - Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function. 170 + Options: 152 171 153 172 ```ts 154 - const result = await add(3, 4); // runs on a dedicated worker for `add` 173 + serverThreads(run.workers, server, { 174 + balance: leastConns(), // or roundRobin(); default: leastConns() 175 + highWaterMark: 64, // per-worker cross-thread buffer (default 64) 176 + listen: { drainTimeout: 30_000 }, // per-worker drain budget on shutdown 177 + }); 155 178 ``` 156 179 157 - ### Task-Args 180 + `highWaterMark` bounds how many fds are in-transit to each worker's MessagePort at once. When a worker can't keep up, the cross-thread pipe parks until it catches up. 181 + 182 + Graceful shutdown: call `server.close()` on the main listener; the cascade closes worker channels, drains in-flight requests (up to `drainTimeout`, default 30s), and resolves the moroutines. Combine with `await using run = workers(...)` for full pool teardown. 183 + 184 + See [`examples/server-threads`](examples/server-threads) for a runnable demo. 185 + 186 + ## Streaming 187 + 188 + ### Streaming Moroutines 158 189 159 - Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection. 190 + Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with atomics-based backpressure — the producer parks when `highWaterMark` items (default 16) sit in flight and resumes when the consumer drains below the cap. 160 191 161 192 ```ts 162 - // db.ts 163 - import { DatabaseSync } from 'node:sqlite'; 193 + // count.ts 164 194 import { mo } from 'moroutine'; 165 195 166 - export const openDb = mo(import.meta, (filename: string): DatabaseSync => { 167 - return new DatabaseSync(filename); 196 + export const countUp = mo(import.meta, async function* (n: number) { 197 + for (let i = 0; i < n; i++) { 198 + yield i; 199 + } 168 200 }); 201 + ``` 169 202 170 - export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => { 171 - db.exec(sql); 203 + Iterate directly (dedicated worker) or dispatch via a pool: 204 + 205 + ```ts 206 + import { workers } from 'moroutine'; 207 + import { countUp } from './count.ts'; 208 + 209 + // Dedicated worker 210 + for await (const n of countUp(5)) { 211 + console.log(n); // 0, 1, 2, 3, 4 212 + } 213 + 214 + // Worker pool 215 + { 216 + using run = workers(2); 217 + for await (const n of run(countUp(5))) { 218 + console.log(n); // 0, 1, 2, 3, 4 219 + } 220 + } 221 + ``` 222 + 223 + ### `channel()` and Fan-out 224 + 225 + When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing). 226 + 227 + ```ts 228 + import { workers, channel, assign, mo } from 'moroutine'; 229 + 230 + const generate = mo(import.meta, async function* (n: number) { 231 + for (let i = 0; i < n; i++) yield i; 172 232 }); 173 233 174 - export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => { 175 - return db.prepare(sql).all(); 234 + const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 235 + const items: number[] = []; 236 + for await (const n of input) items.push(n); 237 + return items; 176 238 }); 177 239 ``` 178 240 179 241 ```ts 180 - import { workers } from 'moroutine'; 181 - import { openDb, exec, query } from './db.ts'; 242 + const ch = channel(generate(100)); 243 + 244 + { 245 + using run = workers(); 246 + const fanout = run.workers.map((w) => { 247 + return assign(w, process(ch)); 248 + }); 249 + const results = await run(fanout); 250 + // Items distributed across workers — no duplicates, no gaps 251 + } 252 + ``` 253 + 254 + Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker. 255 + 256 + Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. 257 + 258 + ### `map()` — Bounded Fan-out 259 + 260 + Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited. 182 261 183 - const db = openDb(':memory:'); 262 + ```ts 263 + // main.ts 264 + import { readdir } from 'node:fs/promises'; 265 + import { join } from 'node:path'; 266 + import { workers, map, inert } from 'moroutine'; 267 + import type { Task } from 'moroutine'; 268 + import { hashFile, type FileHash } from './hash-file.ts'; 184 269 185 270 { 186 - using run = workers(1); 187 - await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)); 188 - await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`)); 189 - const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }] 271 + using run = workers(); 272 + for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) { 273 + console.log(`${hash.slice(0, 12)} ${path}`); 274 + } 275 + } 276 + 277 + async function* walk(dir: string): AsyncGenerator<Task<FileHash>> { 278 + for (const entry of await readdir(dir, { withFileTypes: true })) { 279 + const p = join(dir, entry.name); 280 + if (entry.isDirectory()) { 281 + yield* walk(p); 282 + } else { 283 + yield inert(hashFile(p)); 284 + } 285 + } 190 286 } 191 287 ``` 192 288 193 - `openDb()` returns a `Task<DatabaseSync>`, and `exec()`/`query()` accept it in place of `DatabaseSync`. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it. 289 + ```ts 290 + // hash-file.ts 291 + import { readFile } from 'node:fs/promises'; 292 + import { createHash } from 'node:crypto'; 293 + import { mo } from 'moroutine'; 294 + 295 + export type FileHash = { path: string; hash: string }; 296 + 297 + export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => { 298 + const buf = await readFile(path); 299 + return { path, hash: createHash('sha256').update(buf).digest('hex') }; 300 + }); 301 + ``` 302 + 303 + `map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work. 304 + 305 + ### Pipelines 306 + 307 + Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. 308 + 309 + ```ts 310 + const doubled = double(generate(5)); 311 + const squared = square(doubled); 312 + for await (const n of squared) { 313 + console.log(n); 314 + } 315 + ``` 316 + 317 + ## Transfers 318 + 319 + Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams. 320 + 321 + ```ts 322 + import { transfer } from 'moroutine'; 323 + 324 + const buf = new ArrayBuffer(1024); 325 + await run(processData(transfer(buf))); 326 + // buf is now detached (zero-length) — ownership moved to worker 327 + ``` 328 + 329 + Return values from workers are auto-transferred when possible. 194 330 195 331 ## Shared Memory 196 332 ··· 366 502 console.log(pos.load()); // { x: 1, y: 1 } 367 503 ``` 368 504 369 - ## Server Threads 370 - 371 - Scale a Node HTTP server across worker threads by fanning out socket fds. Main accepts TCP connections; workers each run their own `http.Server` and handle requests. Imported from the `moroutine/serve` subpath. 505 + ## Graceful Shutdown 372 506 373 - ```ts 374 - // app-server.ts 375 - import { createServer } from 'node:http'; 376 - import { mo } from 'moroutine'; 377 - import { listen, type ListenArgs } from 'moroutine/serve'; 378 - 379 - export const runServer = mo(import.meta, async (...args: ListenArgs): Promise<void> => { 380 - const srv = createServer((req, res) => { 381 - res.writeHead(200); 382 - res.end('hello'); 383 - }); 384 - await listen(srv, ...args); 385 - }); 386 - ``` 507 + Use `await using` for graceful async shutdown. The pool exposes a `signal` that fires when disposal begins — thread it into tasks for cooperative cancellation. 387 508 388 509 ```ts 389 - // main.ts 390 - import { once } from 'node:events'; 391 - import { createServer } from 'node:net'; 392 - import { workers, assign } from 'moroutine'; 393 - import { serverThreads } from 'moroutine/serve'; 394 - import { runServer } from './app-server.ts'; 395 - 396 - const server = createServer(); 397 - 398 510 { 399 511 await using run = workers(4); 400 - using threads = serverThreads(run.workers, server); 401 - const fanout = threads.map(([w, args]) => assign(w, runServer(...args))); 402 - void run(fanout); 403 512 404 - await once(server.listen(3000), 'listening'); 405 - console.log('Listening on :3000'); 406 - await once(process, 'SIGINT'); 407 - server.close(); 513 + run(longTask(run.signal)); // task can react to abort 514 + run(otherTask()); // runs to completion 408 515 } 516 + // signal fired, waited for both tasks, then workers terminated 409 517 ``` 410 518 411 - Connection routing defaults to `leastConns()`; pass `{ balance: roundRobin() }` for simple fan-out. TLS termination runs on workers (`https.createServer` / `tls.createServer` inside your moroutine); the main-thread listener stays raw TCP. POSIX-only (Linux, macOS). Frameworks that expose an underlying `http.Server` compose without glue: 412 - 413 - ```ts 414 - // express 415 - const srv = createServer(express()); 416 - await listen(srv, ...args); 417 - 418 - // fastify 419 - const app = fastify(); 420 - await app.ready(); 421 - await listen(app.server, ...args); 422 - ``` 519 + Use `using` (without `await`) for immediate termination, same as before. 423 520 424 - Options: 521 + A `shutdownTimeout` option force-terminates workers if graceful shutdown takes too long: 425 522 426 523 ```ts 427 - serverThreads(run.workers, server, { 428 - balance: leastConns(), // or roundRobin(); default: leastConns() 429 - highWaterMark: 64, // per-worker cross-thread buffer (default 64) 430 - listen: { drainTimeout: 30_000 }, // per-worker drain budget on shutdown 431 - }); 432 - ``` 433 - 434 - `highWaterMark` bounds how many fds are in-transit to each worker's MessagePort at once. When a worker can't keep up, the cross-thread pipe parks until it catches up. 435 - 436 - Graceful shutdown: call `server.close()` on the main listener; the cascade closes worker channels, drains in-flight requests (up to `drainTimeout`, default 30s), and resolves the moroutines. Combine with `await using run = workers(...)` for full pool teardown. 437 - 438 - See [`examples/server-threads`](examples/server-threads) for a runnable demo. 439 - 440 - ## Streaming 441 - 442 - ### Streaming Moroutines 443 - 444 - Wrap an `async function*` with `mo()` to create a streaming moroutine. Values are streamed between threads via `MessageChannel` with atomics-based backpressure — the producer parks when `highWaterMark` items (default 16) sit in flight and resumes when the consumer drains below the cap. 445 - 446 - ```ts 447 - // count.ts 448 - import { mo } from 'moroutine'; 449 - 450 - export const countUp = mo(import.meta, async function* (n: number) { 451 - for (let i = 0; i < n; i++) { 452 - yield i; 453 - } 454 - }); 455 - ``` 456 - 457 - Iterate directly (dedicated worker) or dispatch via a pool: 458 - 459 - ```ts 460 - import { workers } from 'moroutine'; 461 - import { countUp } from './count.ts'; 462 - 463 - // Dedicated worker 464 - for await (const n of countUp(5)) { 465 - console.log(n); // 0, 1, 2, 3, 4 466 - } 467 - 468 - // Worker pool 469 524 { 470 - using run = workers(2); 471 - for await (const n of run(countUp(5))) { 472 - console.log(n); // 0, 1, 2, 3, 4 473 - } 525 + await using run = workers(4, { shutdownTimeout: 5000 }); 526 + // ... 474 527 } 475 528 ``` 476 529 477 - ### `channel()` and Fan-out 530 + ## Load Balancing 478 531 479 - When you pass the same `AsyncIterable` or streaming task argument to multiple tasks, each task gets its own copy of the data. Use `channel()` to share a single source across multiple workers — each item goes to exactly one consumer (work stealing). 532 + The pool uses round-robin scheduling by default. Pass a `balance` option to change the strategy: 480 533 481 534 ```ts 482 - import { workers, channel, assign, mo } from 'moroutine'; 483 - 484 - const generate = mo(import.meta, async function* (n: number) { 485 - for (let i = 0; i < n; i++) yield i; 486 - }); 487 - 488 - const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 489 - const items: number[] = []; 490 - for await (const n of input) items.push(n); 491 - return items; 492 - }); 493 - ``` 494 - 495 - ```ts 496 - const ch = channel(generate(100)); 535 + import { workers, leastBusy } from 'moroutine'; 497 536 498 537 { 499 - using run = workers(); 500 - const fanout = run.workers.map((w) => { 501 - return assign(w, process(ch)); 502 - }); 503 - const results = await run(fanout); 504 - // Items distributed across workers — no duplicates, no gaps 538 + using run = workers(4, { balance: leastBusy() }); 539 + // tasks dispatched to whichever worker has the fewest in-flight tasks 505 540 } 506 541 ``` 507 542 508 - Use `assign(worker, task)` to pin a task to a specific worker. `run.workers` is a read-only array of worker handles, one per pool worker. 543 + Built-in balancers: 509 544 510 - Without `channel()`, `AsyncIterable` and streaming task arguments are auto-detected and streamed to a single consumer. `channel()` is only needed for fan-out. 545 + - `roundRobin()` - cycles through workers in order (default) 546 + - `leastBusy()` - picks the worker with the lowest active task count 511 547 512 - ### `map()` — Bounded Fan-out 513 - 514 - Dispatch a stream of tasks to a pool with bounded concurrency, yielding results in completion order. Wrap each task with `inert()` so it passes through the stream as-is instead of being auto-awaited. 548 + Custom balancers implement the `Balancer` interface: 515 549 516 550 ```ts 517 - // main.ts 518 - import { readdir } from 'node:fs/promises'; 519 - import { join } from 'node:path'; 520 - import { workers, map, inert } from 'moroutine'; 521 - import type { Task } from 'moroutine'; 522 - import { hashFile, type FileHash } from './hash-file.ts'; 523 - 524 - { 525 - using run = workers(); 526 - for await (const { path, hash } of map(run, walk('./src'), { concurrency: 4 })) { 527 - console.log(`${hash.slice(0, 12)} ${path}`); 528 - } 529 - } 551 + import type { Balancer, WorkerHandle, Task } from 'moroutine'; 530 552 531 - async function* walk(dir: string): AsyncGenerator<Task<FileHash>> { 532 - for (const entry of await readdir(dir, { withFileTypes: true })) { 533 - const p = join(dir, entry.name); 534 - if (entry.isDirectory()) { 535 - yield* walk(p); 536 - } else { 537 - yield inert(hashFile(p)); 538 - } 539 - } 540 - } 553 + const random: Balancer = { 554 + select(workers: readonly WorkerHandle[], task: Task) { 555 + return workers[Math.floor(Math.random() * workers.length)]; 556 + }, 557 + }; 541 558 ``` 542 559 543 - ```ts 544 - // hash-file.ts 545 - import { readFile } from 'node:fs/promises'; 546 - import { createHash } from 'node:crypto'; 547 - import { mo } from 'moroutine'; 560 + Each `WorkerHandle` exposes `activeCount` (in-flight tasks) and `thread` (the underlying `worker_threads.Worker`) for building custom strategies. 548 561 549 - export type FileHash = { path: string; hash: string }; 562 + `isTask(moroutine, task)` narrows a task to the descriptor type produced by a specific moroutine — useful inside a balancer to route by task kind or by a key in the args. For example, a worker-affinity balancer can hash a shard key out of the args so that every call for the same key hits the worker that already has its state loaded: 550 563 551 - export const hashFile = mo(import.meta, async (path: string): Promise<FileHash> => { 552 - const buf = await readFile(path); 553 - return { path, hash: createHash('sha256').update(buf).digest('hex') }; 554 - }); 555 - ``` 564 + ```ts 565 + import { isTask, roundRobin } from 'moroutine'; 566 + import type { Balancer } from 'moroutine'; 567 + import { increment, read } from './counter.ts'; 556 568 557 - `map()` accepts a sync iterable, async iterable, or generator of tasks. `concurrency` caps in-flight dispatches (default `1`). Mixed task types unify: `map` over `Task<string> | Task<number>` yields `string | number`. An optional `signal` aborts iteration — and, since moroutine auto-transfers `AbortSignal` args, the same signal passed to tasks will also cancel in-flight work. 558 - 559 - ### Pipelines 560 - 561 - Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker. 562 - 563 - ```ts 564 - const doubled = double(generate(5)); 565 - const squared = square(doubled); 566 - for await (const n of squared) { 567 - console.log(n); 569 + export function keyAffinity(): Balancer { 570 + const fallback = roundRobin(); 571 + return { 572 + select(workers, task) { 573 + let key: string | undefined; 574 + if (isTask(increment, task)) key = task.args[0]; 575 + else if (isTask(read, task)) key = task.args[0]; 576 + if (key === undefined) return fallback.select(workers, task); 577 + return workers[hash(key) % workers.length]; 578 + }, 579 + }; 568 580 } 569 581 ``` 570 582 571 - ## Transfers 572 - 573 - Use `transfer()` for zero-copy movement of `ArrayBuffer`, `TypedArray`, `MessagePort`, or streams. 574 - 575 - ```ts 576 - import { transfer } from 'moroutine'; 577 - 578 - const buf = new ArrayBuffer(1024); 579 - await run(processData(transfer(buf))); 580 - // buf is now detached (zero-length) — ownership moved to worker 581 - ``` 582 - 583 - Return values from workers are auto-transferred when possible. 583 + Inside the `isTask` branch, `task.args` is typed as the moroutine's argument tuple (e.g. `[key: string, n: number]` for `increment`). See [`examples/worker-affinity`](examples/worker-affinity) for the full demo, including per-worker state that only stays consistent under affinity routing. 584 584 585 585 ## Examples 586 586 587 587 All examples require Node v24+ and can be run directly, e.g. `node examples/primes/main.ts`. 588 588 589 - - [`examples/primes`](examples/primes) -- CPU-bound prime checking on a dedicated worker 590 - - [`examples/non-blocking`](examples/non-blocking) -- main thread stays responsive during heavy computation 591 - - [`examples/parallel-batch`](examples/parallel-batch) -- sequential vs parallel batch processing 592 - - [`examples/atomics`](examples/atomics) -- shared atomic counter across workers 593 - - [`examples/shared-state`](examples/shared-state) -- mutex-protected shared struct 594 - - [`examples/multi-module`](examples/multi-module) -- moroutines from multiple modules on one worker 595 - - [`examples/transfer`](examples/transfer) -- zero-copy buffer transfer to and from a worker 596 - - [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching 597 - - [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers 598 - - [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing 599 - - [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()` 600 - - [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks 601 - - [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key 602 - - [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers 589 + - [`examples/primes`](examples/primes) - CPU-bound prime checking on a dedicated worker 590 + - [`examples/non-blocking`](examples/non-blocking) - main thread stays responsive during heavy computation 591 + - [`examples/parallel-batch`](examples/parallel-batch) - sequential vs parallel batch processing 592 + - [`examples/atomics`](examples/atomics) - shared atomic counter across workers 593 + - [`examples/shared-state`](examples/shared-state) - mutex-protected shared struct 594 + - [`examples/multi-module`](examples/multi-module) - moroutines from multiple modules on one worker 595 + - [`examples/transfer`](examples/transfer) - zero-copy buffer transfer to and from a worker 596 + - [`examples/sqlite`](examples/sqlite) - shared SQLite database on a worker via task-arg caching 597 + - [`examples/pipeline`](examples/pipeline) - streaming pipeline across dedicated workers 598 + - [`examples/channel-fanout`](examples/channel-fanout) - fan-out a channel to multiple workers via work stealing 599 + - [`examples/bounded-map`](examples/bounded-map) - recursive tree walk hashing files with `map()` 600 + - [`examples/load-balancing`](examples/load-balancing) - round-robin vs least-busy with variable-cost tasks 601 + - [`examples/worker-affinity`](examples/worker-affinity) - custom balancer using `isTask()` to route tasks by key 602 + - [`examples/server-threads`](examples/server-threads) - scale an HTTP server across worker threads via fd-fanout 603 + - [`examples/benchmark`](examples/benchmark) - roundtrip channel throughput with 1–N workers 604 + - [`examples/benchmark-dispatch`](examples/benchmark-dispatch) - task dispatch overhead measurement