Mirror: The highly customizable and versatile GraphQL client with which you add on features like normalized caching as you grow.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

(core) - Fix concurrent operations not respecting request policies (#1626)

* Fix up implementation of shared single-source results

* Add test for new request policy behaviour

* Add changeset

authored by

Phil Pluckthun and committed by
GitHub
69bf236e 7d74097f

+170 -143
+5
.changeset/new-dingos-clean.md
··· 1 + --- 2 + '@urql/core': patch 3 + --- 4 + 5 + Fix a regression in `@urql/core@2.1.1` that prevented concurrent operations from being dispatched with differing request policies, which for instance prevented the explicit `executeQuery` calls on bindings to fail.
+57
packages/core/src/client.test.ts
··· 565 565 expect(resultTwo).toHaveBeenCalledTimes(1); 566 566 }); 567 567 568 + it('dispatches the correct request policy on subsequent sources', async () => { 569 + const exchange: Exchange = () => ops$ => { 570 + let i = 0; 571 + return pipe( 572 + ops$, 573 + map(op => ({ 574 + data: ++i, 575 + operation: op, 576 + })), 577 + delay(1) 578 + ); 579 + }; 580 + 581 + const client = createClient({ 582 + url: 'test', 583 + exchanges: [exchange], 584 + }); 585 + 586 + const resultOne = jest.fn(); 587 + const resultTwo = jest.fn(); 588 + const operationOne = makeOperation('query', queryOperation, { 589 + ...queryOperation.context, 590 + requestPolicy: 'cache-first', 591 + }); 592 + const operationTwo = makeOperation('query', queryOperation, { 593 + ...queryOperation.context, 594 + requestPolicy: 'network-only', 595 + }); 596 + 597 + pipe(client.executeRequestOperation(operationOne), subscribe(resultOne)); 598 + 599 + expect(resultOne).toHaveBeenCalledTimes(0); 600 + 601 + jest.advanceTimersByTime(1); 602 + 603 + expect(resultOne).toHaveBeenCalledTimes(1); 604 + expect(resultOne).toHaveBeenCalledWith({ 605 + data: 1, 606 + operation: operationOne, 607 + }); 608 + 609 + pipe(client.executeRequestOperation(operationTwo), subscribe(resultTwo)); 610 + 611 + expect(resultTwo).toHaveBeenCalledWith({ 612 + data: 1, 613 + operation: operationOne, 614 + stale: true, 615 + }); 616 + 617 + jest.advanceTimersByTime(1); 618 + 619 + expect(resultTwo).toHaveBeenCalledWith({ 620 + data: 2, 621 + operation: operationTwo, 622 + }); 623 + }); 624 + 568 625 it('replays results from prior operation result as needed (network-only)', async () => { 569 626 const exchange: Exchange = () => ops$ => { 570 627 let i = 0;
+106 -84
packages/core/src/client.ts
··· 4 4 filter, 5 5 makeSubject, 6 6 onEnd, 7 + onPush, 7 8 onStart, 8 9 pipe, 9 10 share, ··· 41 42 import { 42 43 createRequest, 43 44 withPromise, 44 - replayOnStart, 45 45 maskTypename, 46 46 noop, 47 47 makeOperation, ··· 69 69 70 70 export const createClient = (opts: ClientOptions) => new Client(opts); 71 71 72 + interface ActiveOperation { 73 + source: Source<OperationResult>; 74 + replay: OperationResult | null; 75 + } 76 + 72 77 /** The URQL application-wide client library. Each execute method starts a GraphQL request and returns a stream of results. */ 73 78 export class Client { 74 79 /** Start an operation from an exchange */ ··· 90 95 dispatchOperation: (operation?: Operation | void) => void; 91 96 operations$: Source<Operation>; 92 97 results$: Source<OperationResult>; 93 - activeOperations: Map<number, Source<OperationResult>> = new Map(); 98 + activeOperations: Map<number, ActiveOperation> = new Map(); 94 99 queue: Operation[] = []; 95 100 96 101 constructor(opts: ClientOptions) { ··· 198 203 executeRequestOperation<Data = any, Variables = object>( 199 204 operation: Operation<Data, Variables> 200 205 ): Source<OperationResult<Data, Variables>> { 201 - let active = this.activeOperations.get(operation.key); 202 - if (active) return active; 203 - 204 - let result$ = pipe( 205 - this.results$, 206 - filter((res: OperationResult) => res.operation.key === operation.key) 207 - ) as Source<OperationResult<Data, Variables>>; 208 - if (this.maskTypename) { 209 - result$ = pipe( 210 - result$, 211 - map(res => ({ ...res, data: maskTypename(res.data) })) 206 + let activeOperation = this.activeOperations.get(operation.key); 207 + if (!activeOperation) { 208 + let result$ = pipe( 209 + this.results$, 210 + filter( 211 + (res: OperationResult) => 212 + res.operation.kind === operation.kind && 213 + res.operation.key === operation.key 214 + ) 212 215 ); 213 - } 214 216 215 - // A mutation is always limited to just a single result and is never shared 216 - if (operation.kind === 'mutation') { 217 - return pipe( 218 - result$, 219 - onStart<OperationResult>(() => this.dispatchOperation(operation)), 220 - take(1) 221 - ); 222 - } 217 + // Mask typename properties if the option for it is turned on 218 + if (this.maskTypename) { 219 + result$ = pipe( 220 + result$, 221 + map(res => ({ ...res, data: maskTypename(res.data) })) 222 + ); 223 + } 223 224 224 - const teardown$ = pipe( 225 - this.operations$, 226 - filter( 227 - (op: Operation) => op.kind === 'teardown' && op.key === operation.key 228 - ) 229 - ); 225 + // A mutation is always limited to just a single result and is never shared 226 + if (operation.kind === 'mutation') { 227 + return pipe( 228 + result$, 229 + onStart(() => this.dispatchOperation(operation)), 230 + take(1) 231 + ); 232 + } 230 233 231 - const refetch$ = pipe( 232 - this.operations$, 233 - filter( 234 - (op: Operation) => 235 - op.kind === operation.kind && 236 - op.key === operation.key && 237 - (op.context.requestPolicy === 'network-only' || 238 - op.context.requestPolicy === 'cache-and-network') 239 - ), 240 - take(1) 241 - ); 242 - 243 - result$ = pipe( 244 - result$, 245 - takeUntil(teardown$), 246 - switchMap(result => { 247 - if (result.stale) return fromValue(result); 248 - return merge([ 249 - fromValue(result), 250 - pipe( 251 - refetch$, 252 - map(() => ({ ...result, stale: true })) 234 + activeOperation = { 235 + source: pipe( 236 + result$, 237 + // End the results stream when an active teardown event is sent 238 + takeUntil( 239 + pipe( 240 + this.operations$, 241 + filter(op => op.kind === 'teardown' && op.key === operation.key) 242 + ) 253 243 ), 254 - ]); 255 - }), 256 - onEnd<OperationResult>(() => { 257 - this.activeOperations.delete(operation.key); 258 - for (let i = this.queue.length - 1; i >= 0; i--) 259 - if (this.queue[i].key === operation.key) this.queue.splice(i, 1); 260 - this.dispatchOperation( 261 - makeOperation('teardown', operation, operation.context) 262 - ); 263 - }) 264 - ); 244 + switchMap(result => { 245 + if (result.stale) { 246 + return fromValue(result); 247 + } 248 + 249 + return merge([ 250 + fromValue(result), 251 + // Mark a result as stale when a new operation is sent for it 252 + pipe( 253 + this.operations$, 254 + filter(op => { 255 + return ( 256 + op.kind === operation.kind && 257 + op.key === operation.key && 258 + (op.context.requestPolicy === 'network-only' || 259 + op.context.requestPolicy === 'cache-and-network') 260 + ); 261 + }), 262 + take(1), 263 + map(() => ({ ...result, stale: true })) 264 + ), 265 + ]); 266 + }), 267 + onPush(result => { 268 + activeOperation!.replay = result; 269 + }), 270 + onEnd(() => { 271 + // Delete the active operation handle 272 + activeOperation!.replay = null; 273 + this.activeOperations.delete(operation.key); 274 + // Delete all queued up operations of the same key on end 275 + for (let i = this.queue.length - 1; i >= 0; i--) 276 + if (this.queue[i].key === operation.key) this.queue.splice(i, 1); 277 + // Dispatch a teardown signal for the stopped operation 278 + this.dispatchOperation( 279 + makeOperation('teardown', operation, operation.context) 280 + ); 281 + }), 282 + share 283 + ), 284 + replay: null, 285 + }; 286 + } 287 + 288 + const prevReplay = activeOperation.replay; 289 + const subject = makeSubject<OperationResult>(); 290 + const isNetworkOperation = 291 + operation.context.requestPolicy === 'cache-and-network' || 292 + operation.context.requestPolicy === 'network-only'; 265 293 266 - if (operation.kind === 'subscription') { 267 - active = pipe( 268 - result$, 269 - onStart(() => { 270 - this.activeOperations.set(operation.key, active!); 294 + return pipe( 295 + merge([subject.source, activeOperation.source]), 296 + onStart(() => { 297 + this.activeOperations.set(operation.key, activeOperation!); 298 + if (operation.kind === 'subscription') { 299 + return this.dispatchOperation(operation); 300 + } else if (isNetworkOperation) { 271 301 this.dispatchOperation(operation); 272 - }), 273 - share 274 - ); 275 - } else { 276 - const mode = 277 - operation.context.requestPolicy === 'cache-and-network' || 278 - operation.context.requestPolicy === 'network-only' 279 - ? 'pre' 280 - : 'post'; 281 - active = pipe( 282 - result$, 283 - replayOnStart(mode, () => { 284 - this.activeOperations.set(operation.key, active!); 302 + } 303 + 304 + if (prevReplay != null && prevReplay === activeOperation!.replay) { 305 + subject.next( 306 + isNetworkOperation ? { ...prevReplay, stale: true } : prevReplay 307 + ); 308 + } else if (!isNetworkOperation) { 285 309 this.dispatchOperation(operation); 286 - }) 287 - ); 288 - } 289 - 290 - return active; 310 + } 311 + }) 312 + ); 291 313 } 292 314 293 315 query<Data = any, Variables extends object = {}>(
+2 -59
packages/core/src/utils/streamUtils.ts
··· 1 - import { 2 - Operator, 3 - Source, 4 - pipe, 5 - toPromise, 6 - take, 7 - share, 8 - onPush, 9 - onStart, 10 - onEnd, 11 - subscribe, 12 - make, 13 - } from 'wonka'; 1 + import { Source, pipe, toPromise, take } from 'wonka'; 14 2 15 - import { OperationResult, PromisifiedSource } from '../types'; 3 + import { PromisifiedSource } from '../types'; 16 4 17 5 export function withPromise<T>(source$: Source<T>): PromisifiedSource<T> { 18 6 (source$ as PromisifiedSource<T>).toPromise = () => 19 7 pipe(source$, take(1), toPromise); 20 8 return source$ as PromisifiedSource<T>; 21 9 } 22 - 23 - export type ReplayMode = 'pre' | 'post'; 24 - 25 - export function replayOnStart<T extends OperationResult>( 26 - mode: ReplayMode, 27 - start: () => void 28 - ): Operator<T, T> { 29 - return source$ => { 30 - let replay: T | void; 31 - 32 - const shared$ = pipe( 33 - source$, 34 - onEnd(() => { 35 - replay = undefined; 36 - }), 37 - onPush(value => { 38 - replay = value; 39 - }), 40 - share 41 - ); 42 - 43 - return make<T>(observer => { 44 - const prevReplay = replay; 45 - 46 - return pipe( 47 - shared$, 48 - onEnd(observer.complete), 49 - onStart(() => { 50 - if (mode === 'pre') { 51 - start(); 52 - } 53 - 54 - if (prevReplay !== undefined && prevReplay === replay) { 55 - observer.next( 56 - mode === 'pre' ? { ...prevReplay, stale: true } : prevReplay 57 - ); 58 - } else if (mode === 'post') { 59 - start(); 60 - } 61 - }), 62 - subscribe(observer.next) 63 - ).unsubscribe; 64 - }); 65 - }; 66 - }