Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(serve): close dup'd fd on channel-close race; wire highWaterMark

- closeSync(fd) in the catch path prevents fd leak when a PushChannel
is already closed during shutdown.
- Pass highWaterMark through to channel() so backpressure is enforced
at the cross-thread pipe layer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+5 -4
+5 -4
src/serve/server-threads.ts
··· 1 - import { openSync } from 'node:fs'; 1 + import { closeSync, openSync } from 'node:fs'; 2 2 import type { Server } from 'node:net'; 3 3 import { channel } from '../channel.ts'; 4 4 import { shared } from '../shared/shared.ts'; ··· 17 17 opts: ServerThreadsOptions = {}, 18 18 ): ServerThreads { 19 19 const balance = opts.balance ?? leastConns(); 20 - const _hwm = opts.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; 20 + const hwm = opts.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; 21 21 const listenOpts: Required<ListenOptions> = { 22 22 drainTimeout: opts.listen?.drainTimeout ?? DEFAULT_DRAIN_TIMEOUT, 23 23 }; ··· 29 29 const pushChannels = Array.from({ length: n }, () => new PushChannel<number>()); 30 30 // Wrap each PushChannel in moroutine's channel() so it is recognized by 31 31 // moroutine's arg-preparation layer and piped cross-thread via MessagePort. 32 - const channels = pushChannels.map((pc) => channel<number>(pc)); 32 + const channels = pushChannels.map((pc) => channel<number>(pc, { highWaterMark: hwm })); 33 33 // Shared atomic counter slots, one per worker. 34 34 const counters = shared(Array.from({ length: n }, () => int32atomic)); 35 35 ··· 56 56 try { 57 57 pushChannels[idx].send(fd); 58 58 } catch { 59 - // send threw because the channel is closed — undo the increment. 59 + // send threw because the channel is closed — undo the increment and close the fd. 60 60 counters.elements[idx].sub(1); 61 + closeSync(fd); 61 62 } 62 63 }; 63 64