Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

ci allll branches

+98 -17
+1 -1
.tangled/workflows/ci.yaml
··· 1 1 when: 2 2 - event: ['push', 'pull_request'] 3 - branch: ['*'] 3 + branch: ['**'] 4 4 5 5 engine: nixery 6 6
+97 -16
test/serve/fd-dup-diag.test.ts
··· 68 68 server.close(); 69 69 }); 70 70 71 - it('alternative: dup via process.binding if /proc/self/fd fails', { timeout: 5000 }, async () => { 72 - // Test whether we can use a different dup mechanism 73 - try { 74 - const binding = (process as any).binding('spawn_sync'); 75 - console.log('spawn_sync binding available:', !!binding); 76 - } catch (e: any) { 77 - console.log('spawn_sync binding not available:', e.message); 78 - } 71 + it('alternative: detach handle without closing fd', { timeout: 5000 }, async () => { 72 + const { Worker } = await import('node:worker_threads'); 73 + 74 + const workerCode = ` 75 + const { parentPort } = require('node:worker_threads'); 76 + const { Socket } = require('node:net'); 77 + parentPort.on('message', ({ fd }) => { 78 + const sock = new Socket({ fd, readable: true, writable: true }); 79 + sock.setNoDelay(true); 80 + sock.write('hello from detach\\n'); 81 + sock.end(); 82 + }); 83 + `; 84 + const worker = new Worker(workerCode, { eval: true }); 85 + 86 + const server = createServer(); 87 + server.listen(0); 88 + await once(server, 'listening'); 89 + const port = (server.address() as any).port; 90 + 91 + server.on('connection', (socket: any) => { 92 + const handle = socket._handle; 93 + const fd: number = handle.fd; 94 + // Stop libuv from reading on this fd 95 + handle.readStop(); 96 + // Prevent socket.destroy() from closing the fd 97 + handle.close = (cb: any) => { 98 + if (cb) cb(); 99 + }; 100 + // Detach 101 + socket._handle = null; 102 + socket.destroy(); 103 + worker.postMessage({ fd }); 104 + }); 105 + 106 + const client = connect(port); 107 + const chunks: Buffer[] = []; 108 + client.on('data', (c: any) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 109 + await once(client, 'end'); 110 + 111 + const result = Buffer.concat(chunks).toString('utf8'); 112 + console.log('detach approach result:', JSON.stringify(result)); 113 + assert.equal(result, 'hello from detach\n'); 114 + 115 + server.close(); 116 + await worker.terminate(); 117 + }); 118 + 119 + it('alternative: detach handle with bidirectional HTTP', { timeout: 10000 }, async () => { 120 + const { Worker } = await import('node:worker_threads'); 121 + const http = await import('node:http'); 122 + 123 + const workerCode = ` 124 + const { parentPort } = require('node:worker_threads'); 125 + const net = require('net'); 126 + const http = require('http'); 127 + const srv = http.createServer((req, res) => { res.end('bidi-ok'); }); 128 + parentPort.on('message', ({ fd }) => { 129 + const sock = new net.Socket({ fd, readable: true, writable: true }); 130 + sock.setNoDelay(true); 131 + srv.emit('connection', sock); 132 + }); 133 + `; 134 + const worker = new Worker(workerCode, { eval: true }); 79 135 80 - // Check if native addon-free dup is possible via other means 81 - try { 82 - const { execSync } = await import('node:child_process'); 83 - const result = execSync('ls /proc/self/fd/ 2>&1 || echo "NO_PROC"', { encoding: 'utf8' }); 84 - console.log('/proc/self/fd listing:', result.trim().slice(0, 200)); 85 - } catch (e: any) { 86 - console.log('ls /proc/self/fd failed:', e.message); 87 - } 136 + const server = createServer(); 137 + server.listen(0); 138 + await once(server, 'listening'); 139 + const port = (server.address() as any).port; 140 + 141 + server.on('connection', (socket: any) => { 142 + const handle = socket._handle; 143 + const fd: number = handle.fd; 144 + handle.readStop(); 145 + handle.close = (cb: any) => { 146 + if (cb) cb(); 147 + }; 148 + socket._handle = null; 149 + socket.destroy(); 150 + worker.postMessage({ fd }); 151 + }); 152 + 153 + // Full HTTP roundtrip 154 + const body = await new Promise<string>((resolve, reject) => { 155 + const req = http.request({ host: '127.0.0.1', port, path: '/' }, (res) => { 156 + const chunks: Buffer[] = []; 157 + res.on('data', (c: any) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 158 + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); 159 + }); 160 + req.on('error', reject); 161 + req.end(); 162 + }); 163 + 164 + console.log('bidi HTTP result:', JSON.stringify(body)); 165 + assert.equal(body, 'bidi-ok'); 166 + 167 + server.close(); 168 + await worker.terminate(); 88 169 }); 89 170 });