Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: add worker-affinity example using isTask()

A custom Balancer routes tasks to workers by hashing a shard key
extracted from task args via isTask() narrowing. Demonstrates how
per-worker state (a Map in this demo) stays consistent when calls
for the same key are pinned to the same worker.

Output contrasts round-robin (counts split across workers, wrong
totals) with keyAffinity (counts consistent, correct totals).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+82 -1
+2 -1
README.md
··· 476 476 - [`examples/sqlite`](examples/sqlite) -- shared SQLite database on a worker via task-arg caching 477 477 - [`examples/pipeline`](examples/pipeline) -- streaming pipeline across dedicated workers 478 478 - [`examples/channel-fanout`](examples/channel-fanout) -- fan-out a channel to multiple workers via work stealing 479 - - [`examples/bounded-map`](examples/bounded-map) -- bounded-concurrency fan-out with `map()` and a shared abort signal 479 + - [`examples/bounded-map`](examples/bounded-map) -- recursive tree walk hashing files with `map()` 480 480 - [`examples/load-balancing`](examples/load-balancing) -- round-robin vs least-busy with variable-cost tasks 481 + - [`examples/worker-affinity`](examples/worker-affinity) -- custom balancer using `isTask()` to route tasks by key 481 482 - [`examples/benchmark`](examples/benchmark) -- roundtrip channel throughput with 1–N workers
+16
examples/worker-affinity/counter.ts
··· 1 + import { mo } from '../../src/index.ts'; 2 + 3 + // Each worker thread loads this module and gets its own `counts` map. 4 + // Affinity routing keeps all operations on a given key on the same worker 5 + // so the count is consistent. 6 + const counts = new Map<string, number>(); 7 + 8 + export const increment = mo(import.meta, (key: string, n: number): number => { 9 + const next = (counts.get(key) ?? 0) + n; 10 + counts.set(key, next); 11 + return next; 12 + }); 13 + 14 + export const read = mo(import.meta, (key: string): number => { 15 + return counts.get(key) ?? 0; 16 + });
+26
examples/worker-affinity/key-affinity.ts
··· 1 + import { isTask, roundRobin } from '../../src/index.ts'; 2 + import type { Balancer, Task, WorkerHandle } from '../../src/index.ts'; 3 + import { increment, read } from './counter.ts'; 4 + 5 + // Routes tasks to workers based on a key in the task's args, so that 6 + // per-worker state (caches, connections, compiled resources) can be reused 7 + // across successive calls for the same key. Falls back to round-robin for 8 + // any task that doesn't carry a routable key. 9 + export function keyAffinity(): Balancer { 10 + const fallback = roundRobin(); 11 + return { 12 + select(workers: readonly WorkerHandle[], task: Task): WorkerHandle { 13 + let key: string | undefined; 14 + if (isTask(increment, task)) key = task.args[0]; 15 + else if (isTask(read, task)) key = task.args[0]; 16 + if (key === undefined) return fallback.select(workers, task); 17 + return workers[hash(key) % workers.length]; 18 + }, 19 + }; 20 + } 21 + 22 + function hash(s: string): number { 23 + let h = 0; 24 + for (let i = 0; i < s.length; i++) h = (h * 31 + s.charCodeAt(i)) | 0; 25 + return h >>> 0; 26 + }
+38
examples/worker-affinity/main.ts
··· 1 + // Worker affinity via a custom balancer + isTask(). 2 + // 3 + // Each worker keeps its own in-memory counter map. The keyAffinity balancer 4 + // uses isTask() to inspect a task's args and route by a consistent hash of 5 + // the key, so successive operations on the same key always hit the worker 6 + // that already has that key's state. 7 + // 8 + // Requires Node v24+. 9 + // Run: node examples/worker-affinity/main.ts 10 + 11 + import { workers, roundRobin } from '../../src/index.ts'; 12 + import type { Balancer } from '../../src/index.ts'; 13 + import { increment, read } from './counter.ts'; 14 + import { keyAffinity } from './key-affinity.ts'; 15 + 16 + const operations = [ 17 + ['a', 1], 18 + ['b', 1], 19 + ['a', 2], 20 + ['c', 1], 21 + ['b', 3], 22 + ['a', 1], 23 + ['c', 4], 24 + ['b', 1], 25 + ] as const; 26 + const expected = { a: 4, b: 5, c: 5 }; 27 + 28 + async function demo(label: string, balance: Balancer) { 29 + using run = workers(4, { balance }); 30 + await run(operations.map(([k, n]) => increment(k, n))); 31 + const [a, b, c] = await run([read('a'), read('b'), read('c')]); 32 + const got = { a, b, c }; 33 + const ok = got.a === expected.a && got.b === expected.b && got.c === expected.c; 34 + console.log(`${label.padEnd(16)} got=${JSON.stringify(got)} ${ok ? 'OK' : 'WRONG (counts split across workers)'}`); 35 + } 36 + 37 + await demo('round-robin', roundRobin()); 38 + await demo('keyAffinity()', keyAffinity());