Mirror: A Node.js fetch shim using built-in Request, Response, and Headers (but without native fetch)
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: Propagate duplex stream errors and strictly forward early errors to Response stream (#16)

authored by

Phil Pluckthun and committed by
GitHub
18219203 9b43e99d

+181 -50
+5
.changeset/two-kids-cry.md
··· 1 + --- 2 + 'fetch-nodeshim': patch 3 + --- 4 + 5 + Propagate errors for duplex request/response streams, and ensure early errors propagate to the Response stream
+107
src/__tests__/fetch-errors.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach } from 'vitest'; 2 + import { Readable } from 'node:stream'; 3 + 4 + import TestServer from './utils/server.js'; 5 + import { fetch } from '../fetch'; 6 + 7 + describe('fetch error handling', () => { 8 + const server = new TestServer(); 9 + 10 + beforeEach(() => server.start()); 11 + afterEach(() => server.stop()); 12 + 13 + describe('incoming stream errors', () => { 14 + it('should propagate error when connection resets mid-body', async () => { 15 + const url = server.mock((_req, res) => { 16 + res.writeHead(200, { 'Content-Length': '1000' }); 17 + res.write('partial'); 18 + setTimeout(() => res.destroy(), 10); 19 + }); 20 + 21 + const response = await fetch(url); 22 + expect(response.ok).toBe(true); 23 + await expect(response.text()).rejects.toThrow(); 24 + }); 25 + 26 + it('should not cause unhandled errors on incoming stream', async () => { 27 + const errors: Error[] = []; 28 + const handler = (e: Error) => errors.push(e); 29 + process.on('uncaughtException', handler); 30 + 31 + const url = server.mock((_req, res) => { 32 + res.writeHead(200, { 'Transfer-Encoding': 'chunked' }); 33 + res.write('data'); 34 + setTimeout(() => res.destroy(), 10); 35 + }); 36 + 37 + const response = await fetch(url); 38 + try { 39 + await response.text(); 40 + } catch {} 41 + await new Promise(r => setTimeout(r, 50)); 42 + 43 + process.off('uncaughtException', handler); 44 + expect(errors).toHaveLength(0); 45 + }); 46 + }); 47 + 48 + describe('request body pipeline errors', () => { 49 + it('should reject when request body stream errors', async () => { 50 + const body = new Readable({ 51 + read() { 52 + this.push('data'); 53 + setTimeout(() => this.destroy(new Error('stream error')), 10); 54 + }, 55 + }); 56 + 57 + const url = server.mock((req, res) => { 58 + req.on('data', () => {}); 59 + req.on('end', () => res.end('ok')); 60 + req.on('error', () => {}); 61 + }); 62 + 63 + await expect(fetch(url, { method: 'POST', body })).rejects.toThrow( 64 + 'stream error' 65 + ); 66 + }); 67 + }); 68 + 69 + describe('decompression errors', () => { 70 + it('should propagate gzip decompression errors', async () => { 71 + const url = server.mock((_req, res) => { 72 + res.writeHead(200, { 'Content-Encoding': 'gzip' }); 73 + res.end('not gzip'); 74 + }); 75 + 76 + const response = await fetch(url); 77 + expect(response.ok).toBe(true); 78 + await expect(response.text()).rejects.toThrow(); 79 + }); 80 + 81 + it('should propagate brotli decompression errors', async () => { 82 + const url = server.mock((_req, res) => { 83 + res.writeHead(200, { 'Content-Encoding': 'br' }); 84 + res.end('not brotli'); 85 + }); 86 + 87 + const response = await fetch(url); 88 + await expect(response.text()).rejects.toThrow(); 89 + }); 90 + }); 91 + 92 + describe('abort handling', () => { 93 + it('should abort during response streaming', async () => { 94 + const controller = new AbortController(); 95 + 96 + const url = server.mock((_req, res) => { 97 + res.writeHead(200, { 'Transfer-Encoding': 'chunked' }); 98 + const id = setInterval(() => res.write('x'), 10); 99 + res.on('close', () => clearInterval(id)); 100 + }); 101 + 102 + const response = await fetch(url, { signal: controller.signal }); 103 + setTimeout(() => controller.abort(), 30); 104 + await expect(response.text()).rejects.toThrow(); 105 + }); 106 + }); 107 + });
+7 -7
src/__tests__/fetch.test.ts
··· 595 595 }); 596 596 setTimeout(() => controller.abort(), 100); 597 597 await expect(response$).rejects.toThrowErrorMatchingInlineSnapshot( 598 - `[AbortError: The operation was aborted]` 598 + `[AbortError: This operation was aborted]` 599 599 ); 600 600 }); 601 601 ··· 613 613 ]; 614 614 setTimeout(() => controller.abort(), 100); 615 615 await expect(fetches[0]).rejects.toThrowErrorMatchingInlineSnapshot( 616 - `[AbortError: The operation was aborted]` 616 + `[AbortError: This operation was aborted]` 617 617 ); 618 618 await expect(fetches[1]).rejects.toThrowErrorMatchingInlineSnapshot( 619 - `[AbortError: The operation was aborted]` 619 + `[AbortError: This operation was aborted]` 620 620 ); 621 621 }); 622 622 ··· 627 627 signal: controller.signal, 628 628 }); 629 629 }).rejects.toThrowErrorMatchingInlineSnapshot( 630 - `[AbortError: The operation was aborted]` 630 + `[AbortError: This operation was aborted]` 631 631 ); 632 632 }); 633 633 ··· 639 639 await expect(() => 640 640 fetch(request) 641 641 ).rejects.toThrowErrorMatchingInlineSnapshot( 642 - `[AbortError: The operation was aborted]` 642 + `[AbortError: This operation was aborted]` 643 643 ); 644 644 }); 645 645 ··· 672 672 }); 673 673 controller.abort(); 674 674 await expect(response$).rejects.toThrowErrorMatchingInlineSnapshot( 675 - `[AbortError: The operation was aborted]` 675 + `[AbortError: This operation was aborted]` 676 676 ); 677 677 }); 678 678 ··· 708 708 controller.abort(); 709 709 await bodyError$; 710 710 await expect(response$).rejects.toMatchInlineSnapshot( 711 - `[AbortError: The operation was aborted]` 711 + `[AbortError: This operation was aborted]` 712 712 ); 713 713 }); 714 714
+40 -28
src/__tests__/utils/server.js
··· 3 3 import http from 'http'; 4 4 import zlib from 'zlib'; 5 5 import Busboy from 'busboy'; 6 - import {once} from 'events'; 6 + import { once } from 'events'; 7 7 8 8 export default class TestServer { 9 9 constructor() { ··· 42 42 return `http://${this.hostname}:${this.port}/mocked`; 43 43 } 44 44 45 + mock(handler) { 46 + this.server.nextResponseHandler = handler; 47 + return `http://${this.hostname}:${this.port}/mocked`; 48 + } 49 + 45 50 router(request, res) { 46 51 const p = request.url; 47 52 48 53 if (p === '/mocked') { 49 54 if (this.nextResponseHandler) { 50 - this.nextResponseHandler(res); 55 + this.nextResponseHandler(request, res); 51 56 this.nextResponseHandler = undefined; 52 57 } else { 53 - throw new Error('No mocked response. Use ’TestServer.mockResponse()’.'); 58 + throw new Error("No mocked response. Use 'TestServer.mockResponse()'."); 54 59 } 55 60 } 56 61 ··· 91 96 if (p === '/json') { 92 97 res.statusCode = 200; 93 98 res.setHeader('Content-Type', 'application/json'); 94 - res.end(JSON.stringify({ 95 - name: 'value' 96 - })); 99 + res.end( 100 + JSON.stringify({ 101 + name: 'value', 102 + }) 103 + ); 97 104 } 98 105 99 106 if (p === '/gzip') { ··· 244 251 } 245 252 246 253 if (p === '/redirect/301/rn') { 247 - res.statusCode = 301 248 - res.setHeader('Location', '/403') 254 + res.statusCode = 301; 255 + res.setHeader('Location', '/403'); 249 256 res.write('301 Permanently moved.\r\n'); 250 257 res.end(); 251 258 } ··· 321 328 if (p === '/redirect/chunked') { 322 329 res.writeHead(301, { 323 330 Location: '/inspect', 324 - 'Transfer-Encoding': 'chunked' 331 + 'Transfer-Encoding': 'chunked', 325 332 }); 326 333 setTimeout(() => res.end(), 10); 327 334 } ··· 349 356 } 350 357 351 358 if (p === '/error/premature') { 352 - res.writeHead(200, {'content-length': 50}); 359 + res.writeHead(200, { 'content-length': 50 }); 353 360 res.write('foo'); 354 361 setTimeout(() => { 355 362 res.destroy(); ··· 359 366 if (p === '/error/premature/chunked') { 360 367 res.writeHead(200, { 361 368 'Content-Type': 'application/json', 362 - 'Transfer-Encoding': 'chunked' 369 + 'Transfer-Encoding': 'chunked', 363 370 }); 364 371 365 - res.write(`${JSON.stringify({data: 'hi'})}\n`); 372 + res.write(`${JSON.stringify({ data: 'hi' })}\n`); 366 373 367 374 setTimeout(() => { 368 - res.write(`${JSON.stringify({data: 'bye'})}\n`); 375 + res.write(`${JSON.stringify({ data: 'bye' })}\n`); 369 376 }, 50); 370 377 371 378 setTimeout(() => { ··· 435 442 body += c; 436 443 }); 437 444 request.on('end', () => { 438 - res.end(JSON.stringify({ 439 - inspect: true, 440 - method: request.method, 441 - url: request.url, 442 - headers: request.headers, 443 - body 444 - })); 445 + res.end( 446 + JSON.stringify({ 447 + inspect: true, 448 + method: request.method, 449 + url: request.url, 450 + headers: request.headers, 451 + body, 452 + }) 453 + ); 445 454 }); 446 455 } 447 456 448 457 if (p === '/multipart') { 449 458 res.statusCode = 200; 450 459 res.setHeader('Content-Type', 'application/json'); 451 - const busboy = new Busboy({headers: request.headers}); 460 + const busboy = new Busboy({ headers: request.headers }); 452 461 let body = ''; 453 462 busboy.on('file', async (fieldName, file, fileName) => { 454 463 body += `${fieldName}=${fileName}`; 455 464 // consume file data 456 465 // eslint-disable-next-line no-empty, no-unused-vars 457 - for await (const c of file) { } 466 + for await (const c of file) { 467 + } 458 468 }); 459 469 460 470 busboy.on('field', (fieldName, value) => { 461 471 body += `${fieldName}=${value}`; 462 472 }); 463 473 busboy.on('finish', () => { 464 - res.end(JSON.stringify({ 465 - method: request.method, 466 - url: request.url, 467 - headers: request.headers, 468 - body 469 - })); 474 + res.end( 475 + JSON.stringify({ 476 + method: request.method, 477 + url: request.url, 478 + headers: request.headers, 479 + body, 480 + }) 481 + ); 470 482 }); 471 483 request.pipe(busboy); 472 484 }
+22 -15
src/fetch.ts
··· 147 147 const protocol = requestOptions.protocol === 'https:' ? https : http; 148 148 const outgoing = protocol.request(requestOptions); 149 149 150 - outgoing.on('response', incoming => { 150 + let incoming: http.IncomingMessage | undefined; 151 + 152 + const destroy = (reason?: any) => { 153 + if (reason) { 154 + outgoing?.destroy(signal?.aborted ? signal.reason : reason); 155 + incoming?.destroy(signal?.aborted ? signal.reason : reason); 156 + reject(signal?.aborted ? signal.reason : reason); 157 + } 158 + }; 159 + 160 + signal?.addEventListener('abort', destroy); 161 + 162 + outgoing.on('response', _incoming => { 163 + if (signal?.aborted) { 164 + return; 165 + } 166 + 167 + incoming = _incoming; 151 168 incoming.setTimeout(0); // Forcefully disable timeout 152 169 incoming.socket.unref(); 170 + incoming.on('error', destroy); 153 171 154 172 const init = { 155 173 status: incoming.statusCode, ··· 209 227 } 210 228 } 211 229 212 - const destroy = (reason?: any) => { 213 - signal?.removeEventListener('abort', destroy); 214 - if (reason) { 215 - incoming.destroy(signal?.aborted ? signal.reason : reason); 216 - reject(signal?.aborted ? signal.reason : reason); 217 - } 218 - }; 219 - 220 - signal?.addEventListener('abort', destroy); 221 - 222 230 let body: Readable | null = incoming; 223 231 const encoding = init.headers.get('Content-Encoding')?.toLowerCase(); 224 232 if (method === 'HEAD' || init.status === 204 || init.status === 304) { ··· 226 234 } else if (encoding != null) { 227 235 init.headers.set('Content-Encoding', encoding); 228 236 body = pipeline(body, createContentDecoder(encoding), destroy); 237 + outgoing.on('error', destroy); 229 238 } 230 239 231 240 resolve( ··· 237 246 ); 238 247 }); 239 248 240 - outgoing.on('error', reject); 249 + outgoing.on('error', destroy); 241 250 242 251 if (!requestHeaders.has('Accept')) requestHeaders.set('Accept', '*/*'); 243 252 if (requestBody.contentType) ··· 261 270 requestBody.body instanceof Stream 262 271 ? requestBody.body 263 272 : Readable.fromWeb(requestBody.body); 264 - pipeline(body, outgoing, error => { 265 - if (error) reject(error); 266 - }); 273 + pipeline(body, outgoing, destroy); 267 274 } 268 275 } 269 276