Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

test(serve): graceful shutdown and drain timeout

+82
+11
test/serve/fixtures/slow-server.ts
··· 1 + import { createServer } from 'node:http'; 2 + import { mo } from 'moroutine'; 3 + import { listen, type ListenArgs } from 'moroutine/serve'; 4 + 5 + /** Server whose response completes after `delayMs` — used to exercise drain paths. */ 6 + export const runSlowServer = mo(import.meta, async (delayMs: number, ...args: ListenArgs): Promise<void> => { 7 + const srv = createServer((req, res) => { 8 + setTimeout(() => res.end('done'), delayMs); 9 + }); 10 + await listen(srv, ...args); 11 + });
+71
test/serve/shutdown.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { once } from 'node:events'; 4 + import { createServer as createNetServer } from 'node:net'; 5 + import { request } from 'node:http'; 6 + import { workers, assign } from 'moroutine'; 7 + import { serverThreads } from 'moroutine/serve'; 8 + import { runSlowServer } from './fixtures/slow-server.ts'; 9 + 10 + function httpGet(port: number): Promise<string> { 11 + return new Promise((resolve, reject) => { 12 + const req = request({ host: '127.0.0.1', port, path: '/' }, (res) => { 13 + const chunks: Buffer[] = []; 14 + res.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(c))); 15 + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); 16 + }); 17 + req.on('error', reject); 18 + req.end(); 19 + }); 20 + } 21 + 22 + describe('graceful shutdown', () => { 23 + it('in-flight request completes before pool exits', { timeout: 15_000 }, async () => { 24 + const server = createNetServer(); 25 + server.listen(0); 26 + await once(server, 'listening'); 27 + const port = (server.address() as any).port; 28 + 29 + { 30 + using run = workers(1); 31 + using threads = serverThreads(run.workers, server); 32 + const serving = run(threads.map(([w, args]) => assign(w, runSlowServer(200, ...args)))); 33 + 34 + const responsePromise = httpGet(port); 35 + // Give the worker a moment to accept the connection. 36 + await new Promise((r) => setTimeout(r, 50)); 37 + server.close(); 38 + 39 + const body = await responsePromise; 40 + assert.equal(body, 'done'); 41 + await serving; 42 + } 43 + }); 44 + 45 + it('drainTimeout force-closes long-hung connections', { timeout: 15_000 }, async () => { 46 + const server = createNetServer(); 47 + server.listen(0); 48 + await once(server, 'listening'); 49 + const port = (server.address() as any).port; 50 + 51 + { 52 + using run = workers(1, { shutdownTimeout: 10_000 }); 53 + using threads = serverThreads(run.workers, server, { 54 + listen: { drainTimeout: 200 }, 55 + }); 56 + const serving = run(threads.map(([w, args]) => assign(w, runSlowServer(10_000, ...args)))); 57 + 58 + const t0 = Date.now(); 59 + const responsePromise = httpGet(port).catch(() => 'aborted'); 60 + await new Promise((r) => setTimeout(r, 50)); 61 + server.close(); 62 + 63 + const result = await responsePromise; 64 + const elapsed = Date.now() - t0; 65 + // Either request aborted or succeeded very quickly (well before 10s delay). 66 + assert.ok(elapsed < 2000, `expected fast force-close, got ${elapsed}ms`); 67 + assert.equal(result, 'aborted'); 68 + await serving; 69 + } 70 + }); 71 + });