Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(serve): serverThreads() main-side adapter + basic e2e test

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+133
+1
src/serve/index.ts
··· 1 1 export type { ListenArgs, ListenOptions, Balance, ServerThreads, ServerThreadsOptions } from './types.ts'; 2 2 export { leastConns, roundRobin } from './strategies.ts'; 3 3 export { listen } from './listen.ts'; 4 + export { serverThreads } from './server-threads.ts';
+82
src/serve/server-threads.ts
··· 1 + import { openSync } from 'node:fs'; 2 + import type { Server } from 'node:net'; 3 + import { channel } from '../channel.ts'; 4 + import { shared } from '../shared/shared.ts'; 5 + import { int32atomic } from '../shared/descriptors.ts'; 6 + import type { WorkerHandle } from '../runner.ts'; 7 + import { PushChannel } from './push-channel.ts'; 8 + import { leastConns } from './strategies.ts'; 9 + import type { ListenArgs, ListenOptions, ServerThreads, ServerThreadsOptions } from './types.ts'; 10 + 11 + const DEFAULT_HIGH_WATER_MARK = 64; 12 + const DEFAULT_DRAIN_TIMEOUT = 30_000; 13 + 14 + export function serverThreads( 15 + workers: readonly WorkerHandle[], 16 + server: Server, 17 + opts: ServerThreadsOptions = {}, 18 + ): ServerThreads { 19 + const balance = opts.balance ?? leastConns(); 20 + const _hwm = opts.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; 21 + const listenOpts: Required<ListenOptions> = { 22 + drainTimeout: opts.listen?.drainTimeout ?? DEFAULT_DRAIN_TIMEOUT, 23 + }; 24 + 25 + const n = workers.length; 26 + if (n === 0) throw new Error('serverThreads: workers array must be non-empty'); 27 + 28 + // Per-worker fd push channels (each is an AsyncIterable<number>). 29 + const pushChannels = Array.from({ length: n }, () => new PushChannel<number>()); 30 + // Wrap each PushChannel in moroutine's channel() so it is recognized by 31 + // moroutine's arg-preparation layer and piped cross-thread via MessagePort. 32 + const channels = pushChannels.map((pc) => channel<number>(pc)); 33 + // Shared atomic counter slots, one per worker. 34 + const counters = shared(Array.from({ length: n }, () => int32atomic)); 35 + 36 + const onConnection = (socket: any) => { 37 + const origFd: number = socket._handle?.fd; 38 + if (typeof origFd !== 'number') { 39 + // Windows or unexpected handle type — drop hard. 40 + socket.destroy(); 41 + return; 42 + } 43 + // Snapshot counters, pick worker. 44 + const snapshot: number[] = new Array(n); 45 + for (let i = 0; i < n; i++) snapshot[i] = counters.elements[i].load(); 46 + const idx = balance.pick(snapshot); 47 + 48 + // Dup the fd via /dev/fd so the new descriptor is independent of libuv's 49 + // event loop tracking. This lets socket.destroy() properly close the 50 + // original fd (freeing libuv state) while the dup'd fd survives for the 51 + // worker to open as a fresh Socket. 52 + const fd = openSync(`/dev/fd/${origFd}`, 'r+'); 53 + socket.destroy(); 54 + 55 + counters.elements[idx].add(1); 56 + try { 57 + pushChannels[idx].send(fd); 58 + } catch { 59 + // send threw because the channel is closed — undo the increment. 60 + counters.elements[idx].sub(1); 61 + } 62 + }; 63 + 64 + server.on('connection', onConnection); 65 + 66 + let disposed = false; 67 + const dispose = () => { 68 + if (disposed) return; 69 + disposed = true; 70 + server.off('connection', onConnection); 71 + server.off('close', onClose); 72 + for (const pc of pushChannels) pc.close(); 73 + }; 74 + const onClose = () => dispose(); 75 + server.once('close', onClose); 76 + 77 + // Build the indexable/iterable/disposable result. 78 + const entries = workers.map((w, i) => [w, [channels[i], counters, i, listenOpts] as ListenArgs] as const); 79 + const result: readonly (readonly [WorkerHandle, ListenArgs])[] = entries; 80 + Object.defineProperty(result, Symbol.dispose, { value: dispose, enumerable: false }); 81 + return result as unknown as ServerThreads; 82 + }
+39
test/serve/basic.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { once } from 'node:events'; 4 + import { createServer as createNetServer } from 'node:net'; 5 + import { request } from 'node:http'; 6 + import { workers, assign } from 'moroutine'; 7 + import { serverThreads } from 'moroutine/serve'; 8 + import { runBasicServer } from './fixtures/basic-server.ts'; 9 + 10 + function httpGet(port: number): Promise<string> { 11 + return new Promise((resolve, reject) => { 12 + const req = request({ host: '127.0.0.1', port, method: 'GET', path: '/' }, (res) => { 13 + const chunks: Buffer[] = []; 14 + res.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 15 + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); 16 + }); 17 + req.on('error', reject); 18 + req.end(); 19 + }); 20 + } 21 + 22 + describe('serverThreads (basic e2e)', () => { 23 + it('main accepts, worker responds', { timeout: 10_000 }, async () => { 24 + const server = createNetServer(); 25 + server.listen(0); 26 + await once(server, 'listening'); 27 + const port = (server.address() as any).port; 28 + 29 + { 30 + using run = workers(1); 31 + using threads = serverThreads(run.workers, server); 32 + const serving = run(threads.map(([w, args]) => assign(w, runBasicServer(...args)))); 33 + const body = await httpGet(port); 34 + assert.equal(body, 'hello from worker'); 35 + server.close(); 36 + await serving; 37 + } 38 + }); 39 + });
+11
test/serve/fixtures/basic-server.ts
··· 1 + import { createServer } from 'node:http'; 2 + import { mo } from 'moroutine'; 3 + import { listen, type ListenArgs } from 'moroutine/serve'; 4 + 5 + export const runBasicServer = mo(import.meta, async (...args: ListenArgs): Promise<void> => { 6 + const srv = createServer((req, res) => { 7 + res.writeHead(200, { 'Content-Type': 'text/plain' }); 8 + res.end('hello from worker'); 9 + }); 10 + await listen(srv, ...args); 11 + });