Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix operators that may stall when cold sources aren't receiving P… (#60)

* Fix synchronous make Close signal

* Continue allowing Pull/Close/Push on inner mergeMap sources

* Continue allowing Pull/Close/Push on switchMap's inner source

* Continue allowing Pull/Close/Push on concatMap's inner source

* Fix missing Pull in mergeMap and add merge source test

* Implement alternative passesSourcePushThenEnd suite

This is for eager operators that send an additional
Pull signal when they're not expecting one to come
in from the sink. All {merge,switch,concat}Map operators
need to pass this instead of passSourceEnd.

* Add active pulling to sample

sample needs to also actively send Pull signals as
it may not forward any that will cause Pull signals.

* Add active pulling to buffer

* Add active pulling to takeUntil

* Ensure merge sources are instantiated first

Ensure that all merge sources are eagerly pulled
and started without hanging and waiting for the
first source.

Before `merge` and `mergeMap` would behave like
`concat` and `concatMap` when the first source
wouldn't push or end.

* Fix timing of eager pull in mergeMap

* Add test for interval source to merge source

* Revert change to make teardown

authored by

Phil Plückthun and committed by
GitHub
3778eb6d c1b0d309

+325 -71
+67 -36
src/wonka_operators.re
··· 48 48 ); 49 49 | Push(value) when !state.ended => 50 50 Rebel.MutableQueue.add(state.buffer, value); 51 - state.pulled = false; 52 - state.notifierTalkback(. Pull); 51 + if (!state.pulled) { 52 + state.pulled = true; 53 + state.sourceTalkback(. Pull); 54 + state.notifierTalkback(. Pull); 55 + } else { 56 + state.pulled = false; 57 + }; 53 58 | Push(_) => () 54 59 | End when !state.ended => 55 60 state.ended = true; ··· 218 223 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 219 224 | Some(input) => applyInnerSource(f(. input)) 220 225 | None when state.ended => sink(. End) 226 + | None when !state.outerPulled => 227 + state.outerPulled = true; 228 + state.outerTalkback(. Pull); 221 229 | None => () 222 230 }; 223 231 | End => () ··· 228 236 switch (signal) { 229 237 | Start(tb) => state.outerTalkback = tb 230 238 | Push(x) when !state.ended => 231 - if (!state.outerPulled) { 232 - state.outerPulled = true; 233 - state.outerTalkback(. Pull); 234 - } else { 235 - state.outerPulled = false; 236 - }; 237 - 239 + state.outerPulled = false; 238 240 if (state.innerActive) { 239 241 Rebel.MutableQueue.add(state.inputQueue, x); 240 242 } else { ··· 264 266 state.innerPulled = true; 265 267 state.innerTalkback(. Pull); 266 268 }; 267 - | Close when !state.ended => 268 - state.ended = true; 269 - state.innerActive = false; 270 - state.innerTalkback(. Close); 271 - state.outerTalkback(. Close); 272 - | Close => () 269 + | Close => 270 + if (!state.ended) { 271 + state.ended = true; 272 + state.outerTalkback(. Close); 273 + }; 274 + if (state.innerActive) { 275 + state.innerActive = false; 276 + state.innerTalkback(. Close); 277 + }; 273 278 }, 274 279 ), 275 280 ); ··· 355 360 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 356 361 state.innerTalkbacks = 357 362 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 358 - if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { 363 + let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; 364 + if (state.ended && exhausted) { 359 365 sink(. End); 366 + } else if (!state.outerPulled && exhausted) { 367 + state.outerPulled = true; 368 + state.outerTalkback(. Pull); 360 369 }; 361 370 | End => () 362 371 } ··· 369 378 | Push(x) when !state.ended => 370 379 state.outerPulled = false; 371 380 applyInnerSource(f(. x)); 381 + if (!state.outerPulled) { 382 + state.outerPulled = true; 383 + state.outerTalkback(. Pull); 384 + }; 372 385 | Push(_) => () 373 386 | End when !state.ended => 374 387 state.ended = true; ··· 382 395 sink(. 383 396 Start( 384 397 (. signal) => 385 - if (!state.ended) { 386 - switch (signal) { 387 - | Close => 388 - let tbs = state.innerTalkbacks; 398 + switch (signal) { 399 + | Close => 400 + if (!state.ended) { 389 401 state.ended = true; 390 - state.innerTalkbacks = Rebel.Array.makeEmpty(); 391 402 state.outerTalkback(. signal); 392 - Rebel.Array.forEach(tbs, tb => tb(. signal)); 393 - | Pull => 394 - if (!state.outerPulled) { 395 - state.outerPulled = true; 396 - state.outerTalkback(. Pull); 397 - }; 403 + }; 398 404 399 - Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 405 + Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); 406 + state.innerTalkbacks = Rebel.Array.makeEmpty(); 407 + | Pull => 408 + if (!state.outerPulled && !state.ended) { 409 + state.outerPulled = true; 410 + state.outerTalkback(. Pull); 411 + } else { 412 + state.outerPulled = false; 400 413 }; 414 + 415 + Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 401 416 }, 402 417 ), 403 418 ); ··· 500 515 | Start(tb) => state.sourceTalkback = tb 501 516 | Push(x) => 502 517 state.value = Some(x); 503 - state.notifierTalkback(. Pull); 518 + if (!state.pulled) { 519 + state.pulled = true; 520 + state.notifierTalkback(. Pull); 521 + state.sourceTalkback(. Pull); 522 + } else { 523 + state.pulled = false; 524 + }; 504 525 | End when !state.ended => 505 526 state.ended = true; 506 527 state.notifierTalkback(. Close); ··· 682 703 | Push(_) when !state.skip && !state.ended => 683 704 state.pulled = false; 684 705 sink(. signal); 685 - | Push(_) => () 706 + | Push(_) when !state.pulled => 707 + state.pulled = true; 708 + state.sourceTalkback(. Pull); 709 + state.notifierTalkback(. Pull); 710 + | Push(_) => state.pulled = false 686 711 | End => 687 712 if (state.skip) { 688 713 state.notifierTalkback(. Close); ··· 791 816 state.innerActive = false; 792 817 if (state.ended) { 793 818 sink(. signal); 819 + } else if (!state.outerPulled) { 820 + state.outerPulled = true; 821 + state.outerTalkback(. Pull); 794 822 }; 795 823 | End => () 796 824 } ··· 836 864 state.innerPulled = true; 837 865 state.innerTalkback(. Pull); 838 866 }; 839 - | Close when !state.ended => 840 - state.ended = true; 841 - state.innerActive = false; 842 - state.innerTalkback(. Close); 843 - state.outerTalkback(. Close); 844 - | Close => () 867 + | Close => 868 + if (!state.ended) { 869 + state.ended = true; 870 + state.outerTalkback(. Close); 871 + }; 872 + if (state.innerActive) { 873 + state.innerActive = false; 874 + state.innerTalkback(. Close); 875 + }; 845 876 }, 846 877 ), 847 878 );
+150 -20
src/wonka_operators.test.ts
··· 99 99 it('responds to Close signals from sink (spec)', () => { 100 100 let talkback = null; 101 101 let closing = 0; 102 - let pulls = 0; 103 102 104 103 const source: types.sourceT<any> = sink => { 105 104 sink(deriving.start(tb => { 106 - if (tb === deriving.pull) { 107 - pulls++; 108 - if (!closing) sink(deriving.push(0)); 109 - } if (tb === deriving.close) { 105 + if (tb === deriving.pull && !closing) { 106 + sink(deriving.push(0)); 107 + } else if (tb === deriving.close) { 110 108 closing++; 111 109 } 112 110 })); ··· 127 125 talkback(deriving.pull); 128 126 jest.runAllTimers(); 129 127 expect(closing).toBe(1); 130 - expect(pulls).toBe(1); 131 128 }); 132 129 133 130 /* This tests a noop operator for End signals from the source. ··· 138 135 operator: types.operatorT<any, any>, 139 136 result: any = 0 140 137 ) => 138 + it('passes on immediate Push then End signals from source (spec)', () => { 139 + const signals = []; 140 + let talkback = null; 141 + let pulls = 0; 142 + let ending = 0; 143 + 144 + const source: types.sourceT<any> = sink => { 145 + sink(deriving.start(tb => { 146 + expect(tb).not.toBe(deriving.close); 147 + if (tb === deriving.pull) { 148 + pulls++; 149 + if (pulls === 1) { 150 + sink(deriving.push(0)); 151 + sink(deriving.end()); 152 + } 153 + } 154 + })); 155 + }; 156 + 157 + const sink: types.sinkT<any> = signal => { 158 + if (deriving.isStart(signal)) { 159 + talkback = deriving.unboxStart(signal); 160 + } else { 161 + signals.push(signal); 162 + if (deriving.isEnd(signal)) ending++; 163 + } 164 + }; 165 + 166 + operator(source)(sink); 167 + 168 + // When pushing a value we expect an immediate Push then End signal 169 + talkback(deriving.pull); 170 + jest.runAllTimers(); 171 + expect(ending).toBe(1); 172 + expect(signals).toEqual([deriving.push(result), deriving.end()]); 173 + // Also no additional pull event should be created by the operator 174 + expect(pulls).toBe(1); 175 + }); 176 + 177 + /* This tests a noop operator for End signals from the source 178 + after the first pull in response to another. 179 + This is similar to passesSourceEnd but more well behaved since 180 + mergeMap/switchMap/concatMap are eager operators. */ 181 + const passesSourcePushThenEnd = ( 182 + operator: types.operatorT<any, any>, 183 + result: any = 0 184 + ) => 141 185 it('passes on End signals from source (spec)', () => { 142 186 const signals = []; 143 187 let talkback = null; ··· 147 191 const source: types.sourceT<any> = sink => { 148 192 sink(deriving.start(tb => { 149 193 expect(tb).not.toBe(deriving.close); 150 - if (tb === deriving.pull) pulls++; 151 - if (pulls === 1) { 152 - sink(deriving.push(0)); 153 - sink(deriving.end()); 194 + if (tb === deriving.pull) { 195 + pulls++; 196 + if (pulls <= 2) { sink(deriving.push(0)); } 197 + else { sink(deriving.end()); } 154 198 } 155 199 })); 156 200 }; ··· 160 204 talkback = deriving.unboxStart(signal); 161 205 } else { 162 206 signals.push(signal); 207 + if (deriving.isPush(signal)) talkback(deriving.pull); 163 208 if (deriving.isEnd(signal)) ending++; 164 209 } 165 210 }; ··· 169 214 // When pushing a value we expect an immediate Push then End signal 170 215 talkback(deriving.pull); 171 216 jest.runAllTimers(); 172 - expect(pulls).toBe(1); 173 217 expect(ending).toBe(1); 174 - expect(signals).toEqual([deriving.push(result), deriving.end()]); 218 + expect(pulls).toBe(3); 219 + expect(signals).toEqual([ 220 + deriving.push(result), 221 + deriving.push(result), 222 + deriving.end() 223 + ]); 175 224 }); 176 225 177 226 /* This tests a noop operator for Start signals from the source. ··· 375 424 passesPassivePull(noop, [0]); 376 425 passesActivePush(noop, [0]); 377 426 passesSinkClose(noop); 378 - passesSourceEnd(noop, [0]); 427 + passesSourcePushThenEnd(noop, [0]); 379 428 passesSingleStart(noop); 380 429 passesStrictEnd(noop); 381 430 ··· 404 453 passesPassivePull(noop); 405 454 passesActivePush(noop); 406 455 passesSinkClose(noop); 407 - passesSourceEnd(noop); 456 + passesSourcePushThenEnd(noop); 408 457 passesSingleStart(noop); 409 458 passesStrictEnd(noop); 410 459 passesAsyncSequence(noop); ··· 431 480 ]); 432 481 }); 433 482 483 + // This synchronous test for concatMap will behave the same as mergeMap & switchMap 484 + it('lets inner sources finish when outer source ends', () => { 485 + const values = []; 486 + const teardown = jest.fn(); 487 + const fn = (signal: types.signalT<any>) => { 488 + values.push(signal); 489 + if (deriving.isStart(signal)) { 490 + deriving.unboxStart(signal)(deriving.pull); 491 + deriving.unboxStart(signal)(deriving.close); 492 + } 493 + }; 494 + 495 + operators.concatMap(() => { 496 + return sources.make(() => teardown); 497 + })(sources.fromValue(null))(fn); 498 + 499 + expect(teardown).toHaveBeenCalled(); 500 + expect(values).toEqual([ 501 + deriving.start(expect.any(Function)), 502 + ]); 503 + }); 504 + 434 505 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 435 506 it('emits values from each flattened asynchronous source, one at a time', () => { 436 507 const source = web.delay<number>(4)(sources.fromArray([1, 10])); ··· 455 526 [10], 456 527 [20], 457 528 ]); 529 + }); 530 + 531 + it('works for fully asynchronous sources', () => { 532 + const fn = jest.fn(); 533 + 534 + sinks.forEach(fn)( 535 + operators.concatMap(() => { 536 + return sources.make(observer => { 537 + setTimeout(() => observer.next(1)); 538 + return () => {}; 539 + }) 540 + })(sources.fromValue(null)) 541 + ); 542 + 543 + jest.runAllTimers(); 544 + expect(fn).toHaveBeenCalledWith(1); 458 545 }); 459 546 460 547 it('emits synchronous values in order', () => { ··· 583 670 passesPassivePull(noop); 584 671 passesActivePush(noop); 585 672 passesSinkClose(noop); 586 - passesSourceEnd(noop); 673 + passesSourcePushThenEnd(noop); 587 674 passesSingleStart(noop); 588 675 passesStrictEnd(noop); 589 676 passesAsyncSequence(noop); ··· 599 686 next(3); 600 687 complete(); 601 688 602 - expect(fn).toHaveBeenCalledTimes(6); 603 689 expect(fn.mock.calls).toEqual([ 604 690 [deriving.start(expect.any(Function))], 605 691 [deriving.push(1)], ··· 610 696 ]); 611 697 }); 612 698 699 + // This synchronous test for mergeMap will behave the same as concatMap & switchMap 700 + it('lets inner sources finish when outer source ends', () => { 701 + const values = []; 702 + const teardown = jest.fn(); 703 + const fn = (signal: types.signalT<any>) => { 704 + values.push(signal); 705 + if (deriving.isStart(signal)) { 706 + deriving.unboxStart(signal)(deriving.pull); 707 + deriving.unboxStart(signal)(deriving.close); 708 + } 709 + }; 710 + 711 + operators.mergeMap(() => { 712 + return sources.make(() => teardown); 713 + })(sources.fromValue(null))(fn); 714 + 715 + expect(teardown).toHaveBeenCalled(); 716 + expect(values).toEqual([ 717 + deriving.start(expect.any(Function)), 718 + ]); 719 + }); 720 + 613 721 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 614 722 it('emits values from each flattened asynchronous source simultaneously', () => { 615 723 const source = web.delay<number>(4)(sources.fromArray([1, 10])); ··· 624 732 jest.runAllTimers(); 625 733 expect(fn.mock.calls).toEqual([ 626 734 [1], 627 - [2], 628 735 [10], 736 + [2], 629 737 [20], 630 738 ]); 631 739 }); ··· 729 837 passesPassivePull(noop); 730 838 passesActivePush(noop); 731 839 passesSinkClose(noop); 732 - passesSourceEnd(noop); 840 + passesSourcePushThenEnd(noop); 733 841 passesSingleStart(noop); 734 842 passesStrictEnd(noop); 735 843 ··· 880 988 passesPassivePull(noop); 881 989 passesActivePush(noop); 882 990 passesSinkClose(noop); 883 - passesSourceEnd(noop); 991 + passesSourcePushThenEnd(noop); 884 992 passesSingleStart(noop); 885 993 passesStrictEnd(noop); 886 994 passesAsyncSequence(noop); ··· 907 1015 ]); 908 1016 }); 909 1017 1018 + // This synchronous test for switchMap will behave the same as concatMap & mergeMap 1019 + it('lets inner sources finish when outer source ends', () => { 1020 + const values = []; 1021 + const teardown = jest.fn(); 1022 + const fn = (signal: types.signalT<any>) => { 1023 + values.push(signal); 1024 + if (deriving.isStart(signal)) { 1025 + deriving.unboxStart(signal)(deriving.pull); 1026 + deriving.unboxStart(signal)(deriving.close); 1027 + } 1028 + }; 1029 + 1030 + operators.switchMap(() => { 1031 + return sources.make(() => teardown); 1032 + })(sources.fromValue(null))(fn); 1033 + 1034 + expect(teardown).toHaveBeenCalled(); 1035 + expect(values).toEqual([ 1036 + deriving.start(expect.any(Function)), 1037 + ]); 1038 + }); 1039 + 910 1040 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 911 1041 it('emits values from each flattened asynchronous source, one at a time', () => { 912 1042 const source = web.delay<number>(4)(sources.fromArray([1, 10])); ··· 960 1090 passesPassivePull(noop); 961 1091 passesActivePush(noop); 962 1092 passesSinkClose(noop); 963 - passesSourceEnd(noop); 1093 + passesSourcePushThenEnd(noop); 964 1094 passesSingleStart(noop); 965 1095 passesStrictEnd(noop); 966 1096 passesAsyncSequence(noop);
+7
src/wonka_sinks.test.ts
··· 1 1 import * as deriving from './helpers/wonka_deriving'; 2 2 import * as sinks from './wonka_sinks.gen'; 3 + import * as sources from './wonka_sources.gen'; 3 4 import * as web from './web/wonkaJs.gen'; 4 5 import * as types from './wonka_types.gen'; 5 6 ··· 198 199 199 200 await promise; 200 201 expect(fn).toHaveBeenCalledWith(1); 202 + }); 203 + 204 + it('creates a Promise for synchronous sources', async () => { 205 + const fn = jest.fn(); 206 + await web.toPromise(sources.fromArray([1, 2, 3])).then(fn); 207 + expect(fn).toHaveBeenCalledWith(3); 201 208 }); 202 209 }); 203 210
+13 -15
src/wonka_sources.re
··· 63 63 curry(sink => { 64 64 let state: makeStateT = {teardown: (.) => (), ended: false}; 65 65 66 - sink(. 67 - Start( 68 - (. signal) => 69 - switch (signal) { 70 - | Close when !state.ended => 71 - state.ended = true; 72 - state.teardown(.); 73 - | _ => () 74 - }, 75 - ), 76 - ); 77 - 78 66 state.teardown = 79 67 f(. { 80 68 next: value => ··· 87 75 sink(. End); 88 76 }, 89 77 }); 78 + 79 + sink(. 80 + Start( 81 + (. signal) => 82 + switch (signal) { 83 + | Close when !state.ended => 84 + state.ended = true; 85 + state.teardown(.); 86 + | _ => () 87 + }, 88 + ), 89 + ); 90 90 }); 91 91 92 92 type subjectState('a) = { ··· 137 137 (. signal) => { 138 138 switch (signal) { 139 139 | Close => ended := true 140 + | Pull when ! ended^ => sink(. End) 140 141 | _ => () 141 142 } 142 143 }, 143 144 ), 144 145 ); 145 - if (! ended^) { 146 - sink(. End); 147 - }; 148 146 }; 149 147 150 148 [@genType]
+88
src/wonka_sources.test.ts
··· 1 1 import * as deriving from './helpers/wonka_deriving'; 2 2 import * as sources from './wonka_sources.gen'; 3 + import * as operators from './wonka_operators.gen'; 3 4 import * as types from './wonka_types.gen'; 4 5 import * as web from './web/wonkaJs.gen'; 5 6 ··· 132 133 deriving.start(expect.any(Function)), 133 134 deriving.push(1), 134 135 deriving.end() 136 + ]); 137 + }); 138 + }); 139 + 140 + describe('merge', () => { 141 + const source = operators.merge<any>([ 142 + sources.fromValue(0), 143 + sources.empty 144 + ]); 145 + 146 + passesColdPull(source); 147 + passesActiveClose(source); 148 + 149 + it('correctly merges two sources where the second is empty', () => { 150 + const source = operators.merge<any>([ 151 + sources.fromValue(0), 152 + sources.empty 153 + ]); 154 + 155 + expect(collectSignals(source)).toEqual([ 156 + deriving.start(expect.any(Function)), 157 + deriving.push(0), 158 + deriving.end(), 159 + ]); 160 + }); 161 + 162 + it('correctly merges hot sources', () => { 163 + const onStart = jest.fn(); 164 + const source = operators.merge<any>([ 165 + operators.onStart(onStart)(sources.never), 166 + operators.onStart(onStart)(sources.fromArray([1, 2])), 167 + ]); 168 + 169 + const signals = collectSignals(source); 170 + expect(onStart).toHaveBeenCalledTimes(2); 171 + 172 + expect(signals).toEqual([ 173 + deriving.start(expect.any(Function)), 174 + deriving.push(1), 175 + deriving.push(2), 176 + ]); 177 + }); 178 + 179 + it('correctly merges asynchronous sources', () => { 180 + jest.useFakeTimers(); 181 + 182 + const onStart = jest.fn(); 183 + const source = operators.merge<any>([ 184 + operators.onStart(onStart)(sources.fromValue(-1)), 185 + operators.onStart(onStart)( 186 + operators.take(2)(web.interval(50)) 187 + ), 188 + ]); 189 + 190 + const signals = collectSignals(source); 191 + jest.advanceTimersByTime(100); 192 + expect(onStart).toHaveBeenCalledTimes(2); 193 + 194 + expect(signals).toEqual([ 195 + deriving.start(expect.any(Function)), 196 + deriving.push(-1), 197 + deriving.push(0), 198 + deriving.push(1), 199 + deriving.end(), 200 + ]); 201 + }); 202 + }); 203 + 204 + describe('concat', () => { 205 + const source = operators.concat<any>([ 206 + sources.fromValue(0), 207 + sources.empty 208 + ]); 209 + 210 + passesColdPull(source); 211 + passesActiveClose(source); 212 + 213 + it('correctly concats two sources where the second is empty', () => { 214 + const source = operators.concat<any>([ 215 + sources.fromValue(0), 216 + sources.empty 217 + ]); 218 + 219 + expect(collectSignals(source)).toEqual([ 220 + deriving.start(expect.any(Function)), 221 + deriving.push(0), 222 + deriving.end(), 135 223 ]); 136 224 }); 137 225 });