Mirror: The highly customizable and versatile GraphQL client with which you add on features like normalized caching as you grow.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

(core) - Make Client more forgiving by sharing result sources that emit cached results (#1515)

* Replace operation semaphore with shared behavior source

Replace the client's counting of active operations with a
single, shared operation result source in a Map. This achieves
the same semaphore behaviour but replaces it with a binary
semaphore-like cache, where the cached source remembers the
last active result.

* Prevent emitting cached result if a new result emitted first

* Reuse client.query() in client.readQuery()

* Add tests for shared source behaviour

* Abstract replay logic in Client

* Add test for shared subscription behaviour

* Fix linting issues

authored by

Phil Pluckthun and committed by
GitHub
e8ae745b c5e1422f

+324 -64
+1 -1
packages/core/src/__snapshots__/client.test.ts.snap
··· 2 2 3 3 exports[`createClient passes snapshot 1`] = ` 4 4 Client { 5 - "activeOperations": Object {}, 5 + "activeOperations": Map {}, 6 6 "createOperationContext": [Function], 7 7 "createRequestOperation": [Function], 8 8 "dispatchOperation": [Function],
+223 -1
packages/core/src/client.test.ts
··· 11 11 filter, 12 12 toArray, 13 13 tap, 14 + take, 14 15 } from 'wonka'; 16 + 15 17 import { gql } from './gql'; 16 18 import { Exchange, Operation, OperationResult } from './types'; 17 19 import { makeOperation } from './utils'; 18 20 import { createClient } from './client'; 19 - import { queryOperation } from './test-utils'; 21 + import { queryOperation, subscriptionOperation } from './test-utils'; 20 22 21 23 const url = 'https://hostname.com'; 22 24 ··· 507 509 unsubscribe(); 508 510 }); 509 511 }); 512 + 513 + describe('shared sources behavior', () => { 514 + beforeEach(() => { 515 + jest.useFakeTimers(); 516 + }); 517 + 518 + afterEach(() => { 519 + jest.useRealTimers(); 520 + }); 521 + 522 + it('replays results from prior operation result as needed', async () => { 523 + const exchange: Exchange = () => ops$ => { 524 + let i = 0; 525 + return pipe( 526 + ops$, 527 + map(op => ({ 528 + data: ++i, 529 + operation: op, 530 + })), 531 + delay(1) 532 + ); 533 + }; 534 + 535 + const client = createClient({ 536 + url: 'test', 537 + exchanges: [exchange], 538 + }); 539 + 540 + const resultOne = jest.fn(); 541 + const resultTwo = jest.fn(); 542 + 543 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); 544 + 545 + expect(resultOne).toHaveBeenCalledTimes(0); 546 + 547 + jest.advanceTimersByTime(1); 548 + 549 + expect(resultOne).toHaveBeenCalledTimes(1); 550 + expect(resultOne).toHaveBeenCalledWith({ 551 + data: 1, 552 + operation: queryOperation, 553 + }); 554 + 555 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); 556 + 557 + expect(resultTwo).toHaveBeenCalledWith({ 558 + data: 1, 559 + stale: true, 560 + operation: queryOperation, 561 + }); 562 + 563 + jest.advanceTimersByTime(1); 564 + 565 + expect(resultTwo).toHaveBeenCalledWith({ 566 + data: 2, 567 + operation: queryOperation, 568 + }); 569 + }); 570 + 571 + it('replayed results are not emitted on the shared source', () => { 572 + const exchange: Exchange = () => ops$ => { 573 + let i = 0; 574 + return pipe( 575 + ops$, 576 + map(op => ({ 577 + data: ++i, 578 + operation: op, 579 + })), 580 + take(1) 581 + ); 582 + }; 583 + 584 + const client = createClient({ 585 + url: 'test', 586 + exchanges: [exchange], 587 + }); 588 + 589 + const resultOne = jest.fn(); 590 + const resultTwo = jest.fn(); 591 + 592 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); 593 + 594 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); 595 + 596 + expect(resultOne).toHaveBeenCalledTimes(1); 597 + expect(resultTwo).toHaveBeenCalledTimes(1); 598 + 599 + expect(resultTwo).toHaveBeenCalledWith({ 600 + data: 1, 601 + operation: queryOperation, 602 + stale: true, 603 + }); 604 + }); 605 + 606 + it('does nothing when no operation result has been emitted yet', () => { 607 + const exchange: Exchange = () => ops$ => { 608 + return pipe( 609 + ops$, 610 + map(op => ({ data: 1, operation: op })), 611 + filter(() => false) 612 + ); 613 + }; 614 + 615 + const client = createClient({ 616 + url: 'test', 617 + exchanges: [exchange], 618 + }); 619 + 620 + const resultOne = jest.fn(); 621 + const resultTwo = jest.fn(); 622 + 623 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); 624 + 625 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); 626 + 627 + expect(resultOne).toHaveBeenCalledTimes(0); 628 + expect(resultTwo).toHaveBeenCalledTimes(0); 629 + }); 630 + 631 + it('skips replaying results when a result is emitted immediately', () => { 632 + const exchange: Exchange = () => ops$ => { 633 + let i = 0; 634 + return pipe( 635 + ops$, 636 + map(op => ({ data: ++i, operation: op })) 637 + ); 638 + }; 639 + 640 + const client = createClient({ 641 + url: 'test', 642 + exchanges: [exchange], 643 + }); 644 + 645 + const resultOne = jest.fn(); 646 + const resultTwo = jest.fn(); 647 + 648 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); 649 + 650 + expect(resultOne).toHaveBeenCalledWith({ 651 + data: 1, 652 + operation: queryOperation, 653 + }); 654 + 655 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); 656 + 657 + expect(resultTwo).toHaveBeenCalledWith({ 658 + data: 2, 659 + operation: queryOperation, 660 + }); 661 + 662 + expect(resultOne).toHaveBeenCalledWith({ 663 + data: 2, 664 + operation: queryOperation, 665 + }); 666 + }); 667 + 668 + it('replays stale results as needed', () => { 669 + const exchange: Exchange = () => ops$ => { 670 + return pipe( 671 + ops$, 672 + map(op => ({ stale: true, data: 1, operation: op })), 673 + take(1) 674 + ); 675 + }; 676 + 677 + const client = createClient({ 678 + url: 'test', 679 + exchanges: [exchange], 680 + }); 681 + 682 + const resultOne = jest.fn(); 683 + const resultTwo = jest.fn(); 684 + 685 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); 686 + 687 + expect(resultOne).toHaveBeenCalledWith({ 688 + data: 1, 689 + operation: queryOperation, 690 + stale: true, 691 + }); 692 + 693 + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); 694 + 695 + expect(resultTwo).toHaveBeenCalledWith({ 696 + data: 1, 697 + operation: queryOperation, 698 + stale: true, 699 + }); 700 + }); 701 + 702 + it('does nothing when operation is a subscription has been emitted yet', () => { 703 + const exchange: Exchange = () => ops$ => { 704 + return pipe( 705 + ops$, 706 + map(op => ({ data: 1, operation: op })), 707 + take(1) 708 + ); 709 + }; 710 + 711 + const client = createClient({ 712 + url: 'test', 713 + exchanges: [exchange], 714 + }); 715 + 716 + const resultOne = jest.fn(); 717 + const resultTwo = jest.fn(); 718 + 719 + pipe( 720 + client.executeRequestOperation(subscriptionOperation), 721 + subscribe(resultOne) 722 + ); 723 + expect(resultOne).toHaveBeenCalledTimes(1); 724 + 725 + pipe( 726 + client.executeRequestOperation(subscriptionOperation), 727 + subscribe(resultTwo) 728 + ); 729 + expect(resultTwo).toHaveBeenCalledTimes(0); 730 + }); 731 + });
+45 -53
packages/core/src/client.ts
··· 41 41 import { 42 42 createRequest, 43 43 withPromise, 44 + replayOnStart, 44 45 maskTypename, 45 46 noop, 46 47 makeOperation, ··· 66 67 maskTypename?: boolean; 67 68 } 68 69 69 - interface ActiveOperations { 70 - [operationKey: string]: number; 71 - } 72 - 73 70 export const createClient = (opts: ClientOptions) => new Client(opts); 74 71 75 72 /** The URQL application-wide client library. Each execute method starts a GraphQL request and returns a stream of results. */ ··· 93 90 dispatchOperation: (operation?: Operation | void) => void; 94 91 operations$: Source<Operation>; 95 92 results$: Source<OperationResult>; 96 - activeOperations = Object.create(null) as ActiveOperations; 93 + activeOperations: Map<number, Source<OperationResult>> = new Map(); 97 94 queue: Operation[] = []; 98 95 99 96 constructor(opts: ClientOptions) { ··· 138 135 // operation's exchange results 139 136 if ( 140 137 operation.kind === 'mutation' || 141 - (this.activeOperations[operation.key] || 0) > 0 138 + this.activeOperations.has(operation.key) 142 139 ) { 143 140 this.queue.push(operation); 144 141 if (!isOperationBatchActive) { ··· 197 194 this.createOperationContext(opts) 198 195 ); 199 196 200 - /** Counts up the active operation key and dispatches the operation */ 201 - private onOperationStart(operation: Operation) { 202 - const { key } = operation; 203 - this.activeOperations[key] = (this.activeOperations[key] || 0) + 1; 204 - this.dispatchOperation(operation); 205 - } 206 - 207 - /** Deletes an active operation's result observable and sends a teardown signal through the exchange pipeline */ 208 - private onOperationEnd(operation: Operation) { 209 - const { key } = operation; 210 - const prevActive = this.activeOperations[key] || 0; 211 - const newActive = (this.activeOperations[key] = 212 - prevActive <= 0 ? 0 : prevActive - 1); 213 - // Check whether this operation has now become inactive 214 - if (newActive <= 0) { 215 - // Delete all related queued up operations for the inactive one 216 - for (let i = this.queue.length - 1; i >= 0; i--) 217 - if (this.queue[i].key === operation.key) this.queue.splice(i, 1); 218 - // Issue the cancellation teardown operation 219 - this.dispatchOperation( 220 - makeOperation('teardown', operation, operation.context) 221 - ); 222 - } 223 - } 224 - 225 197 /** Executes an Operation by sending it through the exchange pipeline It returns an observable that emits all related exchange results and keeps track of this observable's subscribers. A teardown signal will be emitted when no subscribers are listening anymore. */ 226 198 executeRequestOperation<Data = any, Variables = object>( 227 199 operation: Operation<Data, Variables> 228 200 ): Source<OperationResult<Data, Variables>> { 229 - let operationResults$ = pipe( 201 + let active = this.activeOperations.get(operation.key); 202 + if (active) return active; 203 + 204 + let result$ = pipe( 230 205 this.results$, 231 206 filter((res: OperationResult) => res.operation.key === operation.key) 232 207 ) as Source<OperationResult<Data, Variables>>; 233 - 234 208 if (this.maskTypename) { 235 - operationResults$ = pipe( 236 - operationResults$, 237 - map(res => { 238 - res.data = maskTypename(res.data); 239 - return res; 240 - }) 209 + result$ = pipe( 210 + result$, 211 + map(res => ({ ...res, data: maskTypename(res.data) })) 241 212 ); 242 213 } 243 214 215 + // A mutation is always limited to just a single result and is never shared 244 216 if (operation.kind === 'mutation') { 245 - // A mutation is always limited to just a single result and is never shared 246 217 return pipe( 247 - operationResults$, 218 + result$, 248 219 onStart<OperationResult>(() => this.dispatchOperation(operation)), 249 220 take(1) 250 221 ); ··· 263 234 (op: Operation) => 264 235 op.kind === operation.kind && 265 236 op.key === operation.key && 266 - op.context.requestPolicy !== 'cache-only' 267 - ) 237 + (op.context.requestPolicy === 'network-only' || 238 + op.context.requestPolicy === 'cache-and-network') 239 + ), 240 + take(1) 268 241 ); 269 242 270 - const result$ = pipe( 271 - operationResults$, 243 + result$ = pipe( 244 + result$, 272 245 takeUntil(teardown$), 273 246 switchMap(result => { 274 247 if (result.stale) return fromValue(result); 275 - 276 248 return merge([ 277 249 fromValue(result), 278 250 pipe( 279 251 refetch$, 280 - take(1), 281 252 map(() => ({ ...result, stale: true })) 282 253 ), 283 254 ]); 284 - }), 285 - onStart<OperationResult>(() => { 286 - this.onOperationStart(operation); 287 255 }), 288 256 onEnd<OperationResult>(() => { 289 - this.onOperationEnd(operation); 257 + this.activeOperations.delete(operation.key); 258 + for (let i = this.queue.length - 1; i >= 0; i--) 259 + if (this.queue[i].key === operation.key) this.queue.splice(i, 1); 260 + this.dispatchOperation( 261 + makeOperation('teardown', operation, operation.context) 262 + ); 290 263 }) 291 264 ); 292 265 293 - return result$; 266 + if (operation.kind === 'subscription') { 267 + active = pipe( 268 + result$, 269 + onStart(() => { 270 + this.activeOperations.set(operation.key, active!); 271 + this.dispatchOperation(operation); 272 + }), 273 + share 274 + ); 275 + } else { 276 + active = pipe( 277 + result$, 278 + replayOnStart(() => { 279 + this.activeOperations.set(operation.key, active!); 280 + this.dispatchOperation(operation); 281 + }) 282 + ); 283 + } 284 + 285 + return active; 294 286 } 295 287 296 288 query<Data = any, Variables extends object = {}>( ··· 318 310 let result: OperationResult<Data, Variables> | null = null; 319 311 320 312 pipe( 321 - this.executeQuery(createRequest(query, variables), context), 313 + this.query(query, variables, context), 322 314 subscribe(res => { 323 315 result = res; 324 316 })
+1 -1
packages/core/src/utils/index.ts
··· 4 4 export * from './typenames'; 5 5 export * from './stringifyVariables'; 6 6 export * from './maskTypename'; 7 - export * from './withPromise'; 7 + export * from './streamUtils'; 8 8 export * from './operation'; 9 9 10 10 export const noop = () => {
+54
packages/core/src/utils/streamUtils.ts
··· 1 + import { 2 + Operator, 3 + Source, 4 + pipe, 5 + toPromise, 6 + take, 7 + share, 8 + onPush, 9 + onStart, 10 + onEnd, 11 + subscribe, 12 + make, 13 + } from 'wonka'; 14 + 15 + import { OperationResult, PromisifiedSource } from '../types'; 16 + 17 + export function withPromise<T>(source$: Source<T>): PromisifiedSource<T> { 18 + (source$ as PromisifiedSource<T>).toPromise = () => 19 + pipe(source$, take(1), toPromise); 20 + return source$ as PromisifiedSource<T>; 21 + } 22 + 23 + export function replayOnStart<T extends OperationResult>( 24 + start?: () => void 25 + ): Operator<T, T> { 26 + return source$ => { 27 + let replay: T | void; 28 + 29 + const shared$ = pipe( 30 + source$, 31 + onPush(value => { 32 + replay = value; 33 + }), 34 + share 35 + ); 36 + 37 + return make<T>(observer => { 38 + const prevReplay = replay; 39 + 40 + const subscription = pipe( 41 + shared$, 42 + onEnd(observer.complete), 43 + onStart(() => { 44 + if (start) start(); 45 + if (prevReplay !== undefined && prevReplay === replay) 46 + observer.next({ ...prevReplay, stale: true }); 47 + }), 48 + subscribe(observer.next) 49 + ); 50 + 51 + return subscription.unsubscribe; 52 + }); 53 + }; 54 + }
-8
packages/core/src/utils/withPromise.ts
··· 1 - import { Source, pipe, toPromise, take } from 'wonka'; 2 - import { PromisifiedSource } from '../types'; 3 - 4 - export function withPromise<T>(source$: Source<T>): PromisifiedSource<T> { 5 - (source$ as PromisifiedSource<T>).toPromise = () => 6 - pipe(source$, take(1), toPromise); 7 - return source$ as PromisifiedSource<T>; 8 - }