Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

test(serve): leastConns spreads load across workers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+63
+49
test/serve/distribution.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { once } from 'node:events'; 4 + import { createServer as createNetServer } from 'node:net'; 5 + import { request } from 'node:http'; 6 + import { workers, assign } from 'moroutine'; 7 + import { serverThreads, leastConns } from 'moroutine/serve'; 8 + import { runCountingServer } from './fixtures/counting-server.ts'; 9 + 10 + function httpGet(port: number): Promise<string> { 11 + return new Promise((resolve, reject) => { 12 + const req = request({ host: '127.0.0.1', port, path: '/' }, (res) => { 13 + const chunks: Buffer[] = []; 14 + res.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 15 + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); 16 + }); 17 + req.on('error', reject); 18 + req.end(); 19 + }); 20 + } 21 + 22 + describe('leastConns distribution', () => { 23 + it('spreads 20 concurrent slow requests across 4 workers', { timeout: 30_000 }, async () => { 24 + const server = createNetServer(); 25 + server.listen(0); 26 + await once(server, 'listening'); 27 + const port = (server.address() as any).port; 28 + 29 + { 30 + using run = workers(4); 31 + using threads = serverThreads(run.workers, server, { balance: leastConns() }); 32 + const serving = run(threads.map(([w, args], i) => assign(w, runCountingServer(String(i), 100, ...args)))); 33 + 34 + const responses = await Promise.all(Array.from({ length: 20 }, () => httpGet(port))); 35 + const counts = new Map<string, number>(); 36 + for (const r of responses) counts.set(r, (counts.get(r) ?? 0) + 1); 37 + 38 + // Each worker should handle some requests, and no worker should get all of them. 39 + assert.equal(counts.size, 4, `expected all 4 workers used, got ${[...counts.keys()]}`); 40 + for (const [who, n] of counts) { 41 + assert.ok(n >= 2, `worker ${who} handled too few: ${n}`); 42 + assert.ok(n <= 10, `worker ${who} handled too many: ${n}`); 43 + } 44 + 45 + server.close(); 46 + await serving; 47 + } 48 + }); 49 + });
+14
test/serve/fixtures/counting-server.ts
··· 1 + import { createServer } from 'node:http'; 2 + import { mo } from 'moroutine'; 3 + import { listen, type ListenArgs } from 'moroutine/serve'; 4 + 5 + /** Responds with `worker:<id>` identifying which worker handled the request. */ 6 + export const runCountingServer = mo( 7 + import.meta, 8 + async (workerId: string, holdMs: number, ...args: ListenArgs): Promise<void> => { 9 + const srv = createServer((req, res) => { 10 + setTimeout(() => res.end(`worker:${workerId}`), holdMs); 11 + }); 12 + await listen(srv, ...args); 13 + }, 14 + );