Mirror: The highly customizable and versatile GraphQL client with which you add on features like normalized caching as you grow.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(core): Implement GraphQL SSE Response Support (#3050)

authored by

Phil Pluckthun and committed by
GitHub
afc68a18 35be3e56

+96 -196
+5
.changeset/proud-buses-change.md
··· 1 + --- 2 + '@urql/core': minor 3 + --- 4 + 5 + Implement `text/event-stream` response support. This generally adheres to the GraphQL SSE protocol and GraphQL Yoga push responses, and is an alternative to `multipart/mixed`.
+2 -1
packages/core/src/internal/fetchOptions.ts
··· 91 91 92 92 const headers: HeadersInit = { 93 93 accept: 94 - 'application/graphql-response+json, application/graphql+json, application/json, multipart/mixed', 94 + 'application/graphql-response+json, application/graphql+json, application/json, text/event-stream, multipart/mixed', 95 95 }; 96 + 96 97 if (!useGETMethod) headers['content-type'] = 'application/json'; 97 98 const extraOptions = 98 99 (typeof operation.context.fetchOptions === 'function'
+19 -160
packages/core/src/internal/fetchSource.test.ts
··· 202 202 JSON.stringify(json) + 203 203 '\r\n---'; 204 204 205 - it('listens for more responses (stream)', async () => { 205 + it('listens for more streamed responses', async () => { 206 206 fetch.mockResolvedValue({ 207 207 status: 200, 208 208 headers: { ··· 226 226 data: { 227 227 author: { 228 228 id: '1', 229 - name: 'Steve', 230 229 __typename: 'Author', 231 - todos: [{ id: '1', text: 'stream', __typename: 'Todo' }], 232 230 }, 233 231 }, 234 232 }) ··· 240 238 wrap({ 241 239 incremental: [ 242 240 { 243 - path: ['author', 'todos', 1], 244 - data: { id: '2', text: 'defer', __typename: 'Todo' }, 241 + path: ['author'], 242 + data: { name: 'Steve' }, 245 243 }, 246 244 ], 247 245 hasNext: true, ··· 269 267 }, 270 268 }); 271 269 270 + const AuthorFragment = gql` 271 + fragment authorFields on Author { 272 + name 273 + } 274 + `; 275 + 272 276 const streamedQueryOperation: Operation = makeOperation( 273 277 'query', 274 278 { ··· 276 280 query { 277 281 author { 278 282 id 279 - name 280 - todos @stream { 281 - id 282 - text 283 - } 283 + ...authorFields @defer 284 284 } 285 285 } 286 + 287 + ${AuthorFragment} 286 288 `, 287 289 variables: {}, 288 290 key: 1, ··· 301 303 expect(chunks[0].data).toEqual({ 302 304 author: { 303 305 id: '1', 304 - name: 'Steve', 305 306 __typename: 'Author', 306 - todos: [{ id: '1', text: 'stream', __typename: 'Todo' }], 307 307 }, 308 308 }); 309 309 ··· 312 312 id: '1', 313 313 name: 'Steve', 314 314 __typename: 'Author', 315 - todos: [ 316 - { id: '1', text: 'stream', __typename: 'Todo' }, 317 - { id: '2', text: 'defer', __typename: 'Todo' }, 318 - ], 319 315 }, 320 316 }); 321 317 ··· 324 320 id: '1', 325 321 name: 'Steve', 326 322 __typename: 'Author', 327 - todos: [ 328 - { id: '1', text: 'stream', __typename: 'Todo' }, 329 - { id: '2', text: 'defer', __typename: 'Todo' }, 330 - ], 331 323 }, 332 324 }); 333 325 }); 326 + }); 334 327 335 - it('listens for more responses (defer)', async () => { 328 + describe('on text/event-stream', () => { 329 + const wrap = (json: object) => 'data: ' + JSON.stringify(json) + '\n\n'; 330 + 331 + it('listens for streamed responses', async () => { 336 332 fetch.mockResolvedValue({ 337 333 status: 200, 338 334 headers: { 339 335 get() { 340 - return 'multipart/mixed'; 336 + return 'text/event-stream'; 341 337 }, 342 338 }, 343 339 body: { 344 340 getReader: function () { 345 341 let cancelled = false; 346 342 const results = [ 347 - { 348 - done: false, 349 - value: Buffer.from('\r\n---'), 350 - }, 351 343 { 352 344 done: false, 353 345 value: Buffer.from( ··· 378 370 }, 379 371 { 380 372 done: false, 381 - value: Buffer.from(wrap({ hasNext: false }) + '--'), 373 + value: Buffer.from(wrap({ hasNext: false })), 382 374 }, 383 375 { done: true }, 384 376 ]; ··· 449 441 author: { 450 442 id: '1', 451 443 name: 'Steve', 452 - __typename: 'Author', 453 - }, 454 - }); 455 - }); 456 - 457 - it('listens for more responses (defer-neted)', async () => { 458 - fetch.mockResolvedValue({ 459 - status: 200, 460 - headers: { 461 - get() { 462 - return 'multipart/mixed'; 463 - }, 464 - }, 465 - body: { 466 - getReader: function () { 467 - let cancelled = false; 468 - const results = [ 469 - { 470 - done: false, 471 - value: Buffer.from('\r\n---'), 472 - }, 473 - { 474 - done: false, 475 - value: Buffer.from( 476 - wrap({ 477 - hasNext: true, 478 - data: { 479 - author: { 480 - id: '1', 481 - name: 'Steve', 482 - address: { 483 - country: 'UK', 484 - __typename: 'Address', 485 - }, 486 - __typename: 'Author', 487 - }, 488 - }, 489 - }) 490 - ), 491 - }, 492 - { 493 - done: false, 494 - value: Buffer.from( 495 - wrap({ 496 - incremental: [ 497 - { 498 - path: ['author', 'address'], 499 - data: { street: 'home' }, 500 - }, 501 - ], 502 - hasNext: true, 503 - }) 504 - ), 505 - }, 506 - { 507 - done: false, 508 - value: Buffer.from(wrap({ hasNext: false }) + '--'), 509 - }, 510 - { done: true }, 511 - ]; 512 - let count = 0; 513 - return { 514 - cancel: function () { 515 - cancelled = true; 516 - }, 517 - read: function () { 518 - if (cancelled) throw new Error('No'); 519 - 520 - return Promise.resolve(results[count++]); 521 - }, 522 - }; 523 - }, 524 - }, 525 - }); 526 - 527 - const AddressFragment = gql` 528 - fragment addressFields on Address { 529 - street 530 - } 531 - `; 532 - 533 - const streamedQueryOperation: Operation = makeOperation( 534 - 'query', 535 - { 536 - query: gql` 537 - query { 538 - author { 539 - id 540 - address { 541 - id 542 - country 543 - ...addressFields @defer 544 - } 545 - } 546 - } 547 - 548 - ${AddressFragment} 549 - `, 550 - variables: {}, 551 - key: 1, 552 - }, 553 - context 554 - ); 555 - 556 - const chunks: OperationResult[] = await pipe( 557 - makeFetchSource(streamedQueryOperation, 'https://test.com/graphql', {}), 558 - scan((prev: OperationResult[], item) => [...prev, item], []), 559 - toPromise 560 - ); 561 - 562 - expect(chunks.length).toEqual(3); 563 - 564 - expect(chunks[0].data).toEqual({ 565 - author: { 566 - id: '1', 567 - name: 'Steve', 568 - address: { 569 - country: 'UK', 570 - __typename: 'Address', 571 - }, 572 - __typename: 'Author', 573 - }, 574 - }); 575 - 576 - expect(chunks[1].data).toEqual({ 577 - author: { 578 - id: '1', 579 - name: 'Steve', 580 - address: { 581 - country: 'UK', 582 - street: 'home', 583 - __typename: 'Address', 584 - }, 585 444 __typename: 'Author', 586 445 }, 587 446 });
+70 -35
packages/core/src/internal/fetchSource.ts
··· 4 4 5 5 const decoder = typeof TextDecoder !== 'undefined' ? new TextDecoder() : null; 6 6 const boundaryHeaderRe = /boundary="?([^=";]+)"?/i; 7 + const eventStreamRe = /data: ?([^\n]+)/; 7 8 8 9 type ChunkData = Buffer | Uint8Array; 9 10 ··· 17 18 async function* streamBody(response: Response): AsyncIterableIterator<string> { 18 19 if (response.body![Symbol.asyncIterator]) { 19 20 for await (const chunk of response.body! as any) 20 - yield toString(chunk as ChunkData); 21 + toString(chunk as ChunkData); 21 22 } else { 22 23 const reader = response.body!.getReader(); 23 24 let result: ReadableStreamReadResult<ChunkData>; ··· 29 30 } 30 31 } 31 32 33 + async function* split( 34 + chunks: AsyncIterableIterator<string>, 35 + boundary: string 36 + ): AsyncIterableIterator<string> { 37 + let buffer = ''; 38 + let boundaryIndex: number; 39 + for await (const chunk of chunks) { 40 + buffer += chunk; 41 + while ((boundaryIndex = buffer.indexOf(boundary)) > -1) { 42 + yield buffer.slice(0, boundaryIndex); 43 + buffer = buffer.slice(boundaryIndex + boundary.length); 44 + } 45 + } 46 + } 47 + 48 + async function* parseJSON( 49 + response: Response 50 + ): AsyncIterableIterator<ExecutionResult> { 51 + yield JSON.parse(await response.text()); 52 + } 53 + 54 + async function* parseEventStream( 55 + response: Response 56 + ): AsyncIterableIterator<ExecutionResult> { 57 + let payload: any; 58 + for await (const chunk of split(streamBody(response), '\n\n')) { 59 + const match = chunk.match(eventStreamRe); 60 + if (match) { 61 + const chunk = match[1]; 62 + try { 63 + yield (payload = JSON.parse(chunk)); 64 + } catch (error) { 65 + if (!payload) throw error; 66 + } 67 + if (payload && !payload.hasNext) break; 68 + } 69 + } 70 + if (payload && payload.hasNext) { 71 + yield { hasNext: false }; 72 + } 73 + } 74 + 32 75 async function* parseMultipartMixed( 33 76 contentType: string, 34 77 response: Response 35 78 ): AsyncIterableIterator<ExecutionResult> { 36 79 const boundaryHeader = contentType.match(boundaryHeaderRe); 37 80 const boundary = '--' + (boundaryHeader ? boundaryHeader[1] : '-'); 38 - 39 - let buffer = ''; 40 81 let isPreamble = true; 41 - let boundaryIndex: number; 42 82 let payload: any; 43 - 44 - chunks: for await (const chunk of streamBody(response)) { 45 - buffer += chunk; 46 - while ((boundaryIndex = buffer.indexOf(boundary)) > -1) { 47 - if (isPreamble) { 48 - isPreamble = false; 49 - } else { 50 - const chunk = buffer.slice( 51 - buffer.indexOf('\r\n\r\n') + 4, 52 - boundaryIndex 53 - ); 54 - 55 - try { 56 - yield (payload = JSON.parse(chunk)); 57 - } catch (error) { 58 - if (!payload) throw error; 59 - } 83 + for await (const chunk of split(streamBody(response), boundary)) { 84 + if (isPreamble) { 85 + isPreamble = false; 86 + } else { 87 + try { 88 + yield (payload = JSON.parse( 89 + chunk.slice(chunk.indexOf('\r\n\r\n') + 4) 90 + )); 91 + } catch (error) { 92 + if (!payload) throw error; 60 93 } 61 - 62 - buffer = buffer.slice(boundaryIndex + boundary.length); 63 - if (buffer.startsWith('--') || (payload && !payload.hasNext)) 64 - break chunks; 65 94 } 95 + if (payload && !payload.hasNext) break; 66 96 } 67 - 68 - if (payload && payload.hasNext) yield { hasNext: false }; 97 + if (payload && payload.hasNext) { 98 + yield { hasNext: false }; 99 + } 69 100 } 70 101 71 102 async function* fetchOperation( ··· 85 116 // Delay for a tick to give the Client a chance to cancel the request 86 117 // if a teardown comes in immediately 87 118 await Promise.resolve(); 119 + 88 120 response = await (operation.context.fetch || fetch)(url, fetchOptions); 89 121 const contentType = response.headers.get('Content-Type') || ''; 90 - if (/text\//i.test(contentType)) { 91 - const text = await response.text(); 92 - return yield makeErrorResult(operation, new Error(text), response); 93 - } else if (!/multipart\/mixed/i.test(contentType)) { 94 - const text = await response.text(); 95 - return yield makeResult(operation, JSON.parse(text), response); 122 + 123 + let results: AsyncIterable<ExecutionResult>; 124 + if (/multipart\/mixed/i.test(contentType)) { 125 + results = parseMultipartMixed(contentType, response); 126 + } else if (/text\/event-stream/i.test(contentType)) { 127 + results = parseEventStream(response); 128 + } else if (!/text\//i.test(contentType)) { 129 + results = parseJSON(response); 130 + } else { 131 + throw new Error(await response.text()); 96 132 } 97 133 98 - const iterator = parseMultipartMixed(contentType, response); 99 - for await (const payload of iterator) { 134 + for await (const payload of results) { 100 135 yield (result = result 101 136 ? mergeResultPatch(result, payload, response) 102 137 : makeResult(operation, payload, response));