Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(test): eliminate race in shutdown tests

The previous 50ms setTimeout was unreliable — the worker may not have
booted and started consuming fds yet. Now:
- in-flight test: waits for HTTP response headers (worker is mid-flight)
- drainTimeout test: waits for TCP connect + 100ms grace for fd transit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+55 -10
+3 -3
test/channel-fanout.test.ts
··· 67 67 lengths.reduce((a, b) => a + b, 0), 68 68 4000, 69 69 ); 70 - // Skip-based RR doesn't guarantee even splits under heterogeneous drain 71 - // rates, but no consumer should receive zero items. 70 + // Skip-based RR guarantees each consumer receives at least its initial 71 + // high-water-mark fill (default 16) before faster consumers pull ahead. 72 72 const min = Math.min(...lengths); 73 - assert.ok(min > 0, `expected each consumer > 0 items, got ${lengths.join(',')}`); 73 + assert.ok(min >= 16, `expected each consumer ≥ 16 items, got ${lengths.join(',')}`); 74 74 }); 75 75 });
+52 -7
test/serve/shutdown.test.ts
··· 19 19 }); 20 20 } 21 21 22 + function httpGetWithSignal(port: number): { body: Promise<string>; responded: Promise<void> } { 23 + let onResponded: () => void; 24 + const responded = new Promise<void>((r) => { 25 + onResponded = r; 26 + }); 27 + const body = new Promise<string>((resolve, reject) => { 28 + const req = request({ host: '127.0.0.1', port, path: '/' }, (res) => { 29 + onResponded!(); 30 + const chunks: Buffer[] = []; 31 + res.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 32 + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); 33 + }); 34 + req.on('error', reject); 35 + req.end(); 36 + }); 37 + return { body, responded }; 38 + } 39 + 40 + function httpGetWithConnect(port: number): { body: Promise<string>; connected: Promise<void> } { 41 + let onConnected: () => void; 42 + const connected = new Promise<void>((r) => { 43 + onConnected = r; 44 + }); 45 + const body = new Promise<string>((resolve, reject) => { 46 + const req = request({ host: '127.0.0.1', port, path: '/' }, (res) => { 47 + const chunks: Buffer[] = []; 48 + res.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 49 + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); 50 + }); 51 + req.on('socket', (sock) => { 52 + sock.on('connect', () => onConnected!()); 53 + }); 54 + req.on('error', (e) => reject(e)); 55 + req.end(); 56 + }); 57 + return { body: body.catch(() => 'aborted'), connected }; 58 + } 59 + 22 60 describe('graceful shutdown', () => { 23 61 it('in-flight request completes before pool exits', { timeout: 15_000 }, async () => { 24 62 const server = createNetServer(); ··· 31 69 using threads = serverThreads(run.workers, server); 32 70 const serving = run(threads.map(([w, args]) => assign(w, runSlowServer(200, ...args)))); 33 71 34 - const responsePromise = httpGet(port); 35 - // Give the worker a moment to accept the connection. 36 - await new Promise((r) => setTimeout(r, 50)); 72 + // Wait for the worker to actually begin handling: send the request and 73 + // wait for the response to start (headers received = worker is mid-flight). 74 + const { body, responded } = httpGetWithSignal(port); 75 + await responded; 37 76 server.close(); 38 77 39 - const body = await responsePromise; 40 - assert.equal(body, 'done'); 78 + assert.equal(await body, 'done'); 41 79 await serving; 42 80 } 43 81 }); ··· 56 94 const serving = run(threads.map(([w, args]) => assign(w, runSlowServer(10_000, ...args)))); 57 95 58 96 const t0 = Date.now(); 59 - const responsePromise = httpGet(port).catch(() => 'aborted'); 60 - await new Promise((r) => setTimeout(r, 50)); 97 + // Use httpGetWithSignal — the slow server won't respond for 10s, but 98 + // once the worker accepts, it sends headers (which fires `responded`). 99 + // Actually, slow-server only responds after delay, so headers won't 100 + // arrive until 10s. Instead, wait for the TCP socket to connect, then 101 + // give a small grace for the worker to accept the fd. 102 + const { body: responsePromise, connected } = httpGetWithConnect(port); 103 + await connected; 104 + // Small grace for the fd to transit to the worker and be opened. 105 + await new Promise((r) => setTimeout(r, 100)); 61 106 server.close(); 62 107 63 108 const result = await responsePromise;