Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(serve): rename Balance → ServerThreadBalancer; pass ServerThread

The balancer now receives ServerThread[] ({ worker, conns }) and returns
the selected thread — disambiguates from moroutine's core Balancer
interface and gives custom balancers access to WorkerHandle properties.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+68 -45
+8 -1
src/serve/index.ts
··· 1 - export type { ListenArgs, ListenOptions, Balance, ServerThreads, ServerThreadsOptions } from './types.ts'; 1 + export type { 2 + ListenArgs, 3 + ListenOptions, 4 + ServerThread, 5 + ServerThreadBalancer, 6 + ServerThreads, 7 + ServerThreadsOptions, 8 + } from './types.ts'; 2 9 export { leastConns, roundRobin } from './strategies.ts'; 3 10 export { listen } from './listen.ts'; 4 11 export { serverThreads } from './server-threads.ts';
+6 -5
src/serve/server-threads.ts
··· 6 6 import type { WorkerHandle } from '../runner.ts'; 7 7 import { PushChannel } from './push-channel.ts'; 8 8 import { leastConns } from './strategies.ts'; 9 - import type { ListenArgs, ListenOptions, ServerThreads, ServerThreadsOptions } from './types.ts'; 9 + import type { ListenArgs, ListenOptions, ServerThread, ServerThreads, ServerThreadsOptions } from './types.ts'; 10 10 11 11 const DEFAULT_HIGH_WATER_MARK = 64; 12 12 const DEFAULT_DRAIN_TIMEOUT = 30_000; ··· 40 40 socket.destroy(); 41 41 return; 42 42 } 43 - // Snapshot counters, pick worker. 44 - const snapshot: number[] = new Array(n); 45 - for (let i = 0; i < n; i++) snapshot[i] = counters.elements[i].load(); 46 - const idx = balance.pick(snapshot); 43 + // Snapshot per-worker state, pick target. 44 + const threads: ServerThread[] = new Array(n); 45 + for (let i = 0; i < n; i++) threads[i] = { worker: workers[i], conns: counters.elements[i].load() }; 46 + const selected = balance.select(threads); 47 + const idx = workers.indexOf(selected.worker); 47 48 48 49 // Dup the fd via /dev/fd so the new descriptor is independent of libuv's 49 50 // event loop tracking. This lets socket.destroy() properly close the
+15 -17
src/serve/strategies.ts
··· 1 - import type { Balance } from './types.ts'; 1 + import type { ServerThreadBalancer } from './types.ts'; 2 2 3 - /** Pick the worker with the fewest active connections; ties go to the lowest index. */ 4 - export function leastConns(): Balance { 3 + /** Pick the thread with the fewest active connections; ties go to the first. */ 4 + export function leastConns(): ServerThreadBalancer { 5 5 return { 6 - pick(counters) { 7 - let bestIdx = 0; 8 - let bestCount = counters[0]; 9 - for (let i = 1; i < counters.length; i++) { 10 - if (counters[i] < bestCount) { 11 - bestIdx = i; 12 - bestCount = counters[i]; 6 + select(threads) { 7 + let best = threads[0]; 8 + for (let i = 1; i < threads.length; i++) { 9 + if (threads[i].conns < best.conns) { 10 + best = threads[i]; 13 11 } 14 12 } 15 - return bestIdx; 13 + return best; 16 14 }, 17 15 }; 18 16 } 19 17 20 - /** Cycle through worker indices in order, ignoring counts. */ 21 - export function roundRobin(): Balance { 18 + /** Cycle through threads in order, ignoring connection counts. */ 19 + export function roundRobin(): ServerThreadBalancer { 22 20 let cursor = 0; 23 21 return { 24 - pick(counters) { 25 - const idx = cursor % counters.length; 26 - cursor = (cursor + 1) % counters.length; 27 - return idx; 22 + select(threads) { 23 + const idx = cursor % threads.length; 24 + cursor = (cursor + 1) % threads.length; 25 + return threads[idx]; 28 26 }, 29 27 }; 30 28 }
+10 -4
src/serve/types.ts
··· 13 13 */ 14 14 export type ListenArgs = readonly [fds: AsyncIterable<number>, counter: Int32Atomic, opts: Required<ListenOptions>]; 15 15 16 - /** Connection-routing strategy. Picks a worker index given a counter snapshot. */ 17 - export interface Balance { 18 - pick(counters: readonly number[]): number; 16 + /** Per-worker state snapshot passed to the balancer. */ 17 + export interface ServerThread { 18 + worker: WorkerHandle; 19 + conns: number; 20 + } 21 + 22 + /** Connection-routing strategy. Selects a thread to receive the next connection. */ 23 + export interface ServerThreadBalancer { 24 + select(threads: readonly ServerThread[]): ServerThread; 19 25 } 20 26 21 27 export interface ServerThreadsOptions { 22 28 /** Routing strategy. Default: `leastConns()`. */ 23 - balance?: Balance; 29 + balance?: ServerThreadBalancer; 24 30 /** Options forwarded to each worker's `listen()` call. */ 25 31 listen?: ListenOptions; 26 32 /** Per-worker fd-channel buffer size. Default 64. */
+29 -18
test/serve/strategies.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 3 import { leastConns, roundRobin } from 'moroutine/serve'; 4 + import type { ServerThread } from 'moroutine/serve'; 5 + 6 + function threads(...conns: number[]): ServerThread[] { 7 + return conns.map((c, i) => ({ worker: { id: i } as any, conns: c })); 8 + } 4 9 5 10 describe('leastConns', () => { 6 - it('picks the index with the lowest count', () => { 11 + it('picks the thread with the lowest count', () => { 7 12 const b = leastConns(); 8 - assert.equal(b.pick([5, 2, 7]), 1); 9 - assert.equal(b.pick([0, 0, 0]), 0); // ties → first min 10 - assert.equal(b.pick([3, 3, 1]), 2); 13 + const t = threads(5, 2, 7); 14 + assert.strictEqual(b.select(t), t[1]); 15 + const t2 = threads(0, 0, 0); 16 + assert.strictEqual(b.select(t2), t2[0]); // ties → first 17 + const t3 = threads(3, 3, 1); 18 + assert.strictEqual(b.select(t3), t3[2]); 11 19 }); 12 20 13 21 it('is stateless across calls', () => { 14 22 const b = leastConns(); 15 - assert.equal(b.pick([1, 0, 0]), 1); 16 - assert.equal(b.pick([1, 0, 0]), 1); // same input → same result 23 + const t = threads(1, 0, 0); 24 + assert.strictEqual(b.select(t), t[1]); 25 + assert.strictEqual(b.select(t), t[1]); // same input → same result 17 26 }); 18 27 }); 19 28 20 29 describe('roundRobin', () => { 21 - it('cycles through indices in order', () => { 30 + it('cycles through threads in order', () => { 22 31 const b = roundRobin(); 23 - const counters = [0, 0, 0, 0]; 24 - assert.equal(b.pick(counters), 0); 25 - assert.equal(b.pick(counters), 1); 26 - assert.equal(b.pick(counters), 2); 27 - assert.equal(b.pick(counters), 3); 28 - assert.equal(b.pick(counters), 0); 32 + const t = threads(0, 0, 0, 0); 33 + assert.strictEqual(b.select(t), t[0]); 34 + assert.strictEqual(b.select(t), t[1]); 35 + assert.strictEqual(b.select(t), t[2]); 36 + assert.strictEqual(b.select(t), t[3]); 37 + assert.strictEqual(b.select(t), t[0]); 29 38 }); 30 39 31 - it('ignores counter values', () => { 40 + it('ignores conns values', () => { 32 41 const b = roundRobin(); 33 - assert.equal(b.pick([100, 0, 0]), 0); // does not pick the idle one 34 - assert.equal(b.pick([0, 100, 0]), 1); 42 + const t = threads(100, 0, 0); 43 + assert.strictEqual(b.select(t), t[0]); // does not pick the idle one 44 + assert.strictEqual(b.select(t), t[1]); 35 45 }); 36 46 37 47 it('each instance has independent state', () => { 38 48 const b1 = roundRobin(); 39 49 const b2 = roundRobin(); 40 - b1.pick([0, 0]); // b1 cursor: 1 41 - assert.equal(b2.pick([0, 0]), 0); // b2 starts fresh 50 + const t = threads(0, 0); 51 + b1.select(t); // b1 cursor: 1 52 + assert.strictEqual(b2.select(t), t[0]); // b2 starts fresh 42 53 }); 43 54 });