Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(serve): replace /dev/fd dup with readStop+detach for Linux compat

openSync('/proc/self/fd/N') returns ENXIO on sockets on Linux (including
NixOS). The new approach deregisters libuv's read watcher via readStop()
and no-ops the handle's close() method, then nulls _handle and destroys
the JS socket. The fd survives for the worker to open independently.

Works on both macOS and Linux without filesystem-based dup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+27 -28
+10 -12
src/serve/server-threads.ts
··· 1 - import { closeSync, openSync } from 'node:fs'; 2 - import { platform } from 'node:process'; 3 1 import type { Server } from 'node:net'; 4 - 5 - const fdPath = (fd: number) => (platform === 'linux' ? `/proc/self/fd/${fd}` : `/dev/fd/${fd}`); 6 2 import { channel } from '../channel.ts'; 7 3 import { shared } from '../shared/shared.ts'; 8 4 import { int32atomic } from '../shared/descriptors.ts'; ··· 49 45 const selected = balance.select(threads); 50 46 const idx = workers.indexOf(selected.worker); 51 47 52 - // Dup the fd via /dev/fd so the new descriptor is independent of libuv's 53 - // event loop tracking. This lets socket.destroy() properly close the 54 - // original fd (freeing libuv state) while the dup'd fd survives for the 55 - // worker to open as a fresh Socket. 56 - const fd = openSync(fdPath(origFd), 'r+'); 48 + // Detach the socket from libuv without closing the fd. readStop() 49 + // deregisters the read watcher; no-oping close() prevents destroy() 50 + // from closing the underlying fd. The worker will open it fresh. 51 + const handle = socket._handle; 52 + handle.readStop(); 53 + handle.close = (cb: any) => { 54 + if (cb) cb(); 55 + }; 56 + socket._handle = null; 57 57 socket.destroy(); 58 58 59 59 counters.elements[idx].add(1); 60 60 try { 61 - pushChannels[idx].send(fd); 61 + pushChannels[idx].send(origFd); 62 62 } catch { 63 - // send threw because the channel is closed — undo the increment and close the fd. 64 63 counters.elements[idx].sub(1); 65 - closeSync(fd); 66 64 } 67 65 }; 68 66
+8 -9
test/serve/listen.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 3 import { once } from 'node:events'; 4 - import { openSync } from 'node:fs'; 5 - import { platform } from 'node:process'; 6 4 import { createServer as createNetServer, connect } from 'node:net'; 7 - 8 - const fdPath = (fd: number) => (platform === 'linux' ? `/proc/self/fd/${fd}` : `/dev/fd/${fd}`); 9 5 import { createServer } from 'node:http'; 10 6 import { int32atomic } from 'moroutine'; 11 7 import { Int32Atomic } from '../../src/shared/int32-atomic.ts'; ··· 13 9 import { listen } from 'moroutine/serve'; 14 10 15 11 // Helper: accept a real TCP connection on a scratch listener and yield its fd. 16 - // We dup the fd via /dev/fd so that the new fd is not registered in libuv's 17 - // event loop — this lets new Socket({ fd }) succeed in the same thread. 12 + // Detaches the socket from libuv (readStop + no-op close) so the fd can be 13 + // reused by listen() in the same thread without conflict. 18 14 async function acquireLocalFd(): Promise<{ fd: number; close: () => void }> { 19 15 const srv = createNetServer(); 20 16 srv.listen(0); ··· 22 18 const port = (srv.address() as any).port; 23 19 const client = connect(port); 24 20 const [peer] = (await once(srv, 'connection')) as [any]; 25 - const origFd: number = peer._handle.fd; 26 - const fd = openSync(fdPath(origFd), 'r+'); // dup — new fd, not tracked by libuv 27 - peer.pause(); 21 + const handle = peer._handle; 22 + const fd: number = handle.fd; 23 + handle.readStop(); 24 + handle.close = (cb: any) => { 25 + if (cb) cb(); 26 + }; 28 27 peer._handle = null; 29 28 peer.destroy(); 30 29 return {
+9 -7
test/serve/spike.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 3 import { once } from 'node:events'; 4 - import { openSync } from 'node:fs'; 5 - import { platform } from 'node:process'; 6 4 import { createServer, connect } from 'node:net'; 7 - 8 - const fdPath = (fd: number) => (platform === 'linux' ? `/proc/self/fd/${fd}` : `/dev/fd/${fd}`); 9 5 import { Worker } from 'node:worker_threads'; 10 6 11 7 describe('fd-passing spike', () => { ··· 25 21 26 22 // Main TCP server 27 23 const server = createServer(); 28 - server.on('connection', (socket) => { 29 - // Dup the fd via /dev/fd so the new descriptor is independent of libuv. 30 - const fd = openSync(fdPath((socket as any)._handle.fd), 'r+'); 24 + server.on('connection', (socket: any) => { 25 + const handle = socket._handle; 26 + const fd: number = handle.fd; 27 + // Deregister libuv's read interest; no-op close to preserve the fd. 28 + handle.readStop(); 29 + handle.close = (cb: any) => { 30 + if (cb) cb(); 31 + }; 32 + socket._handle = null; 31 33 socket.destroy(); 32 34 worker.postMessage({ fd }); 33 35 });