Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(serve): listen() worker-side adapter with drain semantics

Also adds PushChannel<T>/pushChannel() as a push-based async-iterable
channel (with .send()/.close()), required by listen() and serverThreads.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+209 -2
+1
src/index.ts
··· 4 4 export { AsyncIterableTask } from './stream-task.ts'; 5 5 export { channel } from './channel.ts'; 6 6 export type { ChannelOptions } from './channel.ts'; 7 + export { pushChannel, PushChannel } from './push-channel.ts'; 7 8 export { workers } from './worker-pool.ts'; 8 9 export { transfer } from './transfer.ts'; 9 10 export { assign } from './assign.ts';
+54
src/push-channel.ts
··· 1 + /** 2 + * A simple push-based async queue that implements AsyncIterable<T>. 3 + * Values are pushed via `.send(item)` and the iteration ends when `.close()` is called. 4 + */ 5 + export class PushChannel<T> implements AsyncIterable<T> { 6 + private readonly queue: T[] = []; 7 + private readonly resolvers: Array<(result: IteratorResult<T>) => void> = []; 8 + private closed = false; 9 + 10 + /** Push a value to waiting consumers (or enqueue if none are waiting). */ 11 + send(item: T): void { 12 + if (this.closed) throw new Error('PushChannel is closed'); 13 + if (this.resolvers.length > 0) { 14 + const resolve = this.resolvers.shift()!; 15 + resolve({ value: item, done: false }); 16 + } else { 17 + this.queue.push(item); 18 + } 19 + } 20 + 21 + /** Signal end-of-stream; iteration will finish after queued items. */ 22 + close(): void { 23 + this.closed = true; 24 + for (const resolve of this.resolvers.splice(0)) { 25 + resolve({ value: undefined as unknown as T, done: true }); 26 + } 27 + } 28 + 29 + [Symbol.asyncIterator](): AsyncIterator<T> { 30 + return { 31 + next: (): Promise<IteratorResult<T>> => { 32 + if (this.queue.length > 0) { 33 + return Promise.resolve({ value: this.queue.shift()!, done: false }); 34 + } 35 + if (this.closed) { 36 + return Promise.resolve({ value: undefined as unknown as T, done: true }); 37 + } 38 + return new Promise<IteratorResult<T>>((resolve) => { 39 + this.resolvers.push(resolve); 40 + }); 41 + }, 42 + return: (): Promise<IteratorResult<T>> => { 43 + return Promise.resolve({ value: undefined as unknown as T, done: true }); 44 + }, 45 + }; 46 + } 47 + } 48 + 49 + /** 50 + * Create a push-based channel with `.send(item)` and `.close()` for async iteration. 51 + */ 52 + export function pushChannel<T>(_opts?: { highWaterMark?: number }): PushChannel<T> { 53 + return new PushChannel<T>(); 54 + }
+1
src/serve/index.ts
··· 1 1 export type { ListenArgs, ListenOptions, Balance, ServerThreads, ServerThreadsOptions } from './types.ts'; 2 2 export { leastConns, roundRobin } from './strategies.ts'; 3 + export { listen } from './listen.ts';
+80
src/serve/listen.ts
··· 1 + import { Socket } from 'node:net'; 2 + import type { Server } from 'node:net'; 3 + import type { PushChannel } from '../push-channel.ts'; 4 + import type { Tuple } from '../shared/tuple.ts'; 5 + import type { Int32Atomic } from '../shared/int32-atomic.ts'; 6 + import type { ListenOptions } from './types.ts'; 7 + 8 + /** 9 + * Drain fds from `ch` and emit each as a `'connection'` on `server`. Maintains 10 + * the per-worker counter (decrements on each socket's 'close' event). Resolves 11 + * after the channel ends and either: (a) all active sockets have closed, or 12 + * (b) `opts.drainTimeout` elapses — in which case any remaining sockets are 13 + * force-destroyed. 14 + */ 15 + export async function listen( 16 + server: Server, 17 + ch: PushChannel<number>, 18 + counters: Tuple<Int32Atomic[]>, 19 + slot: number, 20 + opts: Required<ListenOptions>, 21 + ): Promise<void> { 22 + const active = new Set<Socket>(); 23 + let onDrained: (() => void) | null = null; 24 + 25 + const tryResolve = () => { 26 + if (active.size === 0 && onDrained) { 27 + const cb = onDrained; 28 + onDrained = null; 29 + cb(); 30 + } 31 + }; 32 + 33 + for await (const fd of ch) { 34 + const socket = new Socket({ fd, readable: true, writable: true }); 35 + socket.setNoDelay(true); 36 + active.add(socket); 37 + socket.once('close', () => { 38 + active.delete(socket); 39 + counters.elements[slot].sub(1); 40 + tryResolve(); 41 + }); 42 + server.emit('connection', socket); 43 + } 44 + 45 + // Channel closed; begin drain. 46 + const httpSrv = server as Server & { 47 + closeIdleConnections?: () => void; 48 + closeAllConnections?: () => void; 49 + }; 50 + httpSrv.closeIdleConnections?.(); 51 + 52 + await new Promise<void>((resolve) => { 53 + if (active.size === 0) return resolve(); 54 + 55 + onDrained = resolve; 56 + 57 + const timer = setTimeout(() => { 58 + // Force-close all remaining sockets directly. 59 + for (const socket of active) { 60 + socket.destroy(); 61 + } 62 + // Also try the server-level helpers if available. 63 + httpSrv.closeAllConnections?.(); 64 + // Poll until active drains (socket 'close' events fire async after destroy). 65 + const poll = setInterval(() => { 66 + if (active.size === 0) { 67 + clearInterval(poll); 68 + onDrained = null; 69 + resolve(); 70 + } 71 + }, 10); 72 + }, opts.drainTimeout); 73 + 74 + // Override resolve to also clear the timer. 75 + onDrained = () => { 76 + clearTimeout(timer); 77 + resolve(); 78 + }; 79 + }); 80 + }
+2 -2
src/serve/types.ts
··· 1 - import type { Channel } from '../channel.ts'; 1 + import type { PushChannel } from '../push-channel.ts'; 2 2 import type { Tuple } from '../shared/tuple.ts'; 3 3 import type { Int32Atomic } from '../shared/int32-atomic.ts'; 4 4 import type { WorkerHandle } from '../runner.ts'; ··· 14 14 * The shape may evolve; users only ever spread it into `listen()`. 15 15 */ 16 16 export type ListenArgs = readonly [ 17 - channel: Channel<number>, 17 + channel: PushChannel<number>, 18 18 counters: Tuple<Int32Atomic[]>, 19 19 slot: number, 20 20 opts: Required<ListenOptions>,
+71
test/serve/listen.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { once } from 'node:events'; 4 + import { openSync } from 'node:fs'; 5 + import { createServer as createNetServer, connect } from 'node:net'; 6 + import { createServer } from 'node:http'; 7 + import { pushChannel as channel, shared, int32atomic } from 'moroutine'; 8 + import { listen } from 'moroutine/serve'; 9 + 10 + // Helper: accept a real TCP connection on a scratch listener and yield its fd. 11 + // We dup the fd via /dev/fd so that the new fd is not registered in libuv's 12 + // event loop — this lets new Socket({ fd }) succeed in the same thread. 13 + async function acquireLocalFd(): Promise<{ fd: number; close: () => void }> { 14 + const srv = createNetServer(); 15 + srv.listen(0); 16 + await once(srv, 'listening'); 17 + const port = (srv.address() as any).port; 18 + const client = connect(port); 19 + const [peer] = (await once(srv, 'connection')) as [any]; 20 + const origFd: number = peer._handle.fd; 21 + const fd = openSync(`/dev/fd/${origFd}`, 'r+'); // dup — new fd, not tracked by libuv 22 + peer.pause(); 23 + peer._handle = null; 24 + peer.destroy(); 25 + return { 26 + fd, 27 + close: () => { 28 + srv.close(); 29 + client.destroy(); 30 + }, 31 + }; 32 + } 33 + 34 + describe('listen()', () => { 35 + it('emits received fds as connections on the user server', { timeout: 5000 }, async () => { 36 + const ch = channel<number>({ highWaterMark: 4 }); 37 + const counters = shared([int32atomic]); 38 + const http = createServer((req, res) => { 39 + res.end('ok'); 40 + }); 41 + const drained = listen(http, ch, counters, 0, { drainTimeout: 5_000 }); 42 + 43 + const { fd, close: closeSrc } = await acquireLocalFd(); 44 + ch.send(fd); 45 + // Wait briefly for the socket to be emitted, then destroy the client so 46 + // the connection closes and drained can resolve. 47 + await new Promise((r) => setTimeout(r, 50)); 48 + closeSrc(); // destroy client so socket closes naturally 49 + ch.close(); 50 + await drained; 51 + }); 52 + 53 + it('decrements counter on socket close', { timeout: 5000 }, async () => { 54 + const ch = channel<number>({ highWaterMark: 4 }); 55 + const counters = shared([int32atomic]); 56 + counters.elements[0].store(1); // simulate main having incremented 57 + const http = createServer((req, res) => { 58 + res.end('ok'); 59 + }); 60 + const drained = listen(http, ch, counters, 0, { drainTimeout: 5_000 }); 61 + 62 + const { fd, close: closeSrc } = await acquireLocalFd(); 63 + ch.send(fd); 64 + await new Promise((r) => setTimeout(r, 50)); 65 + closeSrc(); // destroy client so socket closes → counter decrements 66 + await new Promise((r) => setTimeout(r, 100)); 67 + assert.equal(counters.elements[0].load(), 0); 68 + ch.close(); 69 + await drained; 70 + }); 71 + });