Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(serve): simplify ListenArgs — pass single Int32Atomic counter

Instead of passing the entire counters tuple + slot index, pass the
individual Int32Atomic for this worker's slot. Simpler signature,
fewer args, less indirection in listen().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+16 -23
+6 -8
src/serve/listen.ts
··· 1 1 import { Socket } from 'node:net'; 2 2 import type { Server } from 'node:net'; 3 - import type { Tuple } from '../shared/tuple.ts'; 4 3 import type { Int32Atomic } from '../shared/int32-atomic.ts'; 5 4 import type { ListenOptions } from './types.ts'; 6 5 7 6 /** 8 - * Drain fds from `ch` and emit each as a `'connection'` on `server`. Maintains 9 - * the per-worker counter (decrements on each socket's 'close' event). Resolves 10 - * after the channel ends and either: (a) all active sockets have closed, or 11 - * (b) `opts.drainTimeout` elapses — in which case any remaining sockets are 7 + * Drain fds from the iterable and emit each as a `'connection'` on `server`. 8 + * Decrements `counter` on each socket's 'close' event. Resolves after the 9 + * iterable ends and either: (a) all active sockets have closed, or 10 + * (b) `opts.drainTimeout` elapses — in which case remaining sockets are 12 11 * force-destroyed. 13 12 */ 14 13 export async function listen( 15 14 server: Server, 16 15 fds: AsyncIterable<number>, 17 - counters: Tuple<Int32Atomic[]>, 18 - slot: number, 16 + counter: Int32Atomic, 19 17 opts: Required<ListenOptions>, 20 18 ): Promise<void> { 21 19 const active = new Set<Socket>(); ··· 35 33 active.add(socket); 36 34 socket.once('close', () => { 37 35 active.delete(socket); 38 - counters.elements[slot].sub(1); 36 + counter.sub(1); 39 37 tryResolve(); 40 38 }); 41 39 server.emit('connection', socket);
+1 -1
src/serve/server-threads.ts
··· 76 76 server.once('close', onClose); 77 77 78 78 // Build the indexable/iterable/disposable result. 79 - const entries = workers.map((w, i) => [w, [channels[i], counters, i, listenOpts] as ListenArgs] as const); 79 + const entries = workers.map((w, i) => [w, [channels[i], counters.elements[i], listenOpts] as ListenArgs] as const); 80 80 const result: readonly (readonly [WorkerHandle, ListenArgs])[] = entries; 81 81 Object.defineProperty(result, Symbol.dispose, { value: dispose, enumerable: false }); 82 82 return result as unknown as ServerThreads;
+1 -7
src/serve/types.ts
··· 1 - import type { Tuple } from '../shared/tuple.ts'; 2 1 import type { Int32Atomic } from '../shared/int32-atomic.ts'; 3 2 import type { WorkerHandle } from '../runner.ts'; 4 3 ··· 12 11 * Opaque tuple passed from `serverThreads` to each worker's moroutine. 13 12 * The shape may evolve; users only ever spread it into `listen()`. 14 13 */ 15 - export type ListenArgs = readonly [ 16 - fds: AsyncIterable<number>, 17 - counters: Tuple<Int32Atomic[]>, 18 - slot: number, 19 - opts: Required<ListenOptions>, 20 - ]; 14 + export type ListenArgs = readonly [fds: AsyncIterable<number>, counter: Int32Atomic, opts: Required<ListenOptions>]; 21 15 22 16 /** Connection-routing strategy. Picks a worker index given a counter snapshot. */ 23 17 export interface Balance {
+8 -7
test/serve/listen.test.ts
··· 4 4 import { openSync } from 'node:fs'; 5 5 import { createServer as createNetServer, connect } from 'node:net'; 6 6 import { createServer } from 'node:http'; 7 - import { shared, int32atomic } from 'moroutine'; 7 + import { int32atomic } from 'moroutine'; 8 + import { Int32Atomic } from '../../src/shared/int32-atomic.ts'; 8 9 import { pushChannel } from '../../src/serve/push-channel.ts'; 9 10 import { listen } from 'moroutine/serve'; 10 11 ··· 35 36 describe('listen()', () => { 36 37 it('emits received fds as connections on the user server', { timeout: 5000 }, async () => { 37 38 const ch = pushChannel<number>({ highWaterMark: 4 }); 38 - const counters = shared([int32atomic]); 39 + const counter = new Int32Atomic(); 39 40 const http = createServer((req, res) => { 40 41 res.end('ok'); 41 42 }); 42 - const drained = listen(http, ch, counters, 0, { drainTimeout: 5_000 }); 43 + const drained = listen(http, ch, counter, { drainTimeout: 5_000 }); 43 44 44 45 const { fd, close: closeSrc } = await acquireLocalFd(); 45 46 ch.send(fd); ··· 53 54 54 55 it('decrements counter on socket close', { timeout: 5000 }, async () => { 55 56 const ch = pushChannel<number>({ highWaterMark: 4 }); 56 - const counters = shared([int32atomic]); 57 - counters.elements[0].store(1); // simulate main having incremented 57 + const counter = new Int32Atomic(); 58 + counter.store(1); // simulate main having incremented 58 59 const http = createServer((req, res) => { 59 60 res.end('ok'); 60 61 }); 61 - const drained = listen(http, ch, counters, 0, { drainTimeout: 5_000 }); 62 + const drained = listen(http, ch, counter, { drainTimeout: 5_000 }); 62 63 63 64 const { fd, close: closeSrc } = await acquireLocalFd(); 64 65 ch.send(fd); 65 66 await new Promise((r) => setTimeout(r, 50)); 66 67 closeSrc(); // destroy client so socket closes → counter decrements 67 68 await new Promise((r) => setTimeout(r, 100)); 68 - assert.equal(counters.elements[0].load(), 0); 69 + assert.equal(counter.load(), 0); 69 70 ch.close(); 70 71 await drained; 71 72 });