Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix cold sources for {concat,switch,merge}Map (#52)

* Fix switchMap pulling for cold sources

The pulling behaviour wouldn't take cold sources
into account, which could lead to values being accepted
in the incorrect order.

* Fix passesActivePush for switchMap

* Fix mergeMap pulling for cold sources

* Fix state.ended being set for Close in mergeMap

* Fix concatMap pulling for cold sources

* Remove redundant state.closed from concatMap

authored by

Phil Plückthun and committed by
GitHub
4213aaa0 fdbba03f

+155 -102
+112 -85
src/wonka_operators.re
··· 178 178 type concatMapStateT('a) = { 179 179 inputQueue: Rebel.MutableQueue.t('a), 180 180 mutable outerTalkback: (. talkbackT) => unit, 181 + mutable outerPulled: bool, 181 182 mutable innerTalkback: (. talkbackT) => unit, 182 183 mutable innerActive: bool, 183 - mutable closed: bool, 184 + mutable innerPulled: bool, 184 185 mutable ended: bool, 185 186 }; 186 187 ··· 191 192 let state: concatMapStateT('a) = { 192 193 inputQueue: Rebel.MutableQueue.make(), 193 194 outerTalkback: talkbackPlaceholder, 195 + outerPulled: false, 194 196 innerTalkback: talkbackPlaceholder, 195 197 innerActive: false, 196 - closed: false, 198 + innerPulled: false, 197 199 ended: false, 198 200 }; 199 201 200 202 let rec applyInnerSource = innerSource => 201 203 innerSource((. signal) => 202 204 switch (signal) { 203 - | End => 205 + | Start(tb) => 206 + state.innerActive = true; 207 + state.innerTalkback = tb; 208 + state.innerPulled = false; 209 + tb(. Pull); 210 + | Push(_) when state.innerActive => 211 + sink(. signal); 212 + if (!state.innerPulled) { 213 + state.innerTalkback(. Pull); 214 + } else { 215 + state.innerPulled = false; 216 + }; 217 + | Push(_) => () 218 + | End when state.innerActive => 204 219 state.innerActive = false; 205 - state.innerTalkback = talkbackPlaceholder; 206 - 207 220 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 208 221 | Some(input) => applyInnerSource(f(. input)) 209 222 | None when state.ended => sink(. End) 210 223 | None => () 211 224 }; 212 - | Start(tb) => 213 - state.innerActive = true; 214 - state.innerTalkback = tb; 215 - tb(. Pull); 216 - | Push(x) when !state.closed => 217 - sink(. Push(x)); 218 - state.innerTalkback(. Pull); 219 - | Push(_) => () 225 + | End => () 220 226 } 221 227 ); 222 228 223 229 source((. signal) => 224 230 switch (signal) { 225 - | End when !state.ended => 226 - state.ended = true; 227 - if (!state.innerActive 228 - && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 229 - sink(. End); 230 - }; 231 - | End => () 232 - | Start(tb) => 233 - state.outerTalkback = tb; 234 - tb(. Pull); 231 + | Start(tb) => state.outerTalkback = tb 235 232 | Push(x) when !state.ended => 233 + if (!state.outerPulled) { 234 + state.outerPulled = true; 235 + state.outerTalkback(. Pull); 236 + } else { 237 + state.outerPulled = false; 238 + }; 239 + 236 240 if (state.innerActive) { 237 241 Rebel.MutableQueue.add(state.inputQueue, x); 238 242 } else { 239 243 applyInnerSource(f(. x)); 240 244 }; 241 - 242 - state.outerTalkback(. Pull); 243 245 | Push(_) => () 246 + | End when !state.ended => 247 + state.ended = true; 248 + if (!state.innerActive 249 + && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 250 + sink(. End); 251 + }; 252 + | End => () 244 253 } 245 254 ); 246 255 ··· 249 258 (. signal) => 250 259 switch (signal) { 251 260 | Pull => 252 - if (!state.ended) { 261 + if (!state.ended && !state.outerPulled) { 262 + state.outerPulled = true; 263 + state.outerTalkback(. Pull); 264 + }; 265 + if (state.innerActive && !state.innerPulled) { 266 + state.innerPulled = true; 253 267 state.innerTalkback(. Pull); 254 - } 255 - | Close => 256 - state.innerTalkback(. Close); 257 - if (!state.ended) { 258 - state.ended = true; 259 - state.closed = true; 260 - state.outerTalkback(. Close); 261 - state.innerTalkback = talkbackPlaceholder; 262 268 }; 269 + | Close when !state.ended => 270 + state.ended = true; 271 + state.innerActive = false; 272 + state.innerTalkback(. Close); 273 + state.outerTalkback(. Close); 274 + | Close => () 263 275 }, 264 276 ), 265 277 ); ··· 305 317 306 318 type mergeMapStateT = { 307 319 mutable outerTalkback: (. talkbackT) => unit, 320 + mutable outerPulled: bool, 308 321 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 309 322 mutable ended: bool, 310 323 }; ··· 315 328 curry(sink => { 316 329 let state: mergeMapStateT = { 317 330 outerTalkback: talkbackPlaceholder, 331 + outerPulled: false, 318 332 innerTalkbacks: Rebel.Array.makeEmpty(), 319 333 ended: false, 320 334 }; ··· 345 359 346 360 source((. signal) => 347 361 switch (signal) { 362 + | Start(tb) => state.outerTalkback = tb 363 + | Push(x) when !state.ended => 364 + state.outerPulled = false; 365 + applyInnerSource(f(. x)); 366 + | Push(_) => () 348 367 | End when !state.ended => 349 368 state.ended = true; 350 369 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 351 370 sink(. End); 352 371 }; 353 372 | End => () 354 - | Start(tb) => 355 - state.outerTalkback = tb; 356 - tb(. Pull); 357 - | Push(x) when !state.ended => 358 - applyInnerSource(f(. x)); 359 - state.outerTalkback(. Pull); 360 - | Push(_) => () 361 373 } 362 374 ); 363 375 ··· 365 377 Start( 366 378 (. signal) => 367 379 switch (signal) { 368 - | Close => 369 - Rebel.Array.forEach(state.innerTalkbacks, talkback => 370 - talkback(. Close) 371 - ); 372 - if (!state.ended) { 373 - state.ended = true; 374 - state.outerTalkback(. Close); 375 - Rebel.Array.forEach(state.innerTalkbacks, talkback => 376 - talkback(. Close) 377 - ); 378 - state.innerTalkbacks = Rebel.Array.makeEmpty(); 379 - }; 380 + | Close when !state.ended => 381 + let tbs = state.innerTalkbacks; 382 + state.innerTalkbacks = Rebel.Array.makeEmpty(); 383 + state.outerTalkback(. signal); 384 + Rebel.Array.forEach(tbs, tb => tb(. signal)); 385 + | Close => () 380 386 | Pull when !state.ended => 381 - Rebel.Array.forEach(state.innerTalkbacks, talkback => 382 - talkback(. Pull) 383 - ) 387 + if (!state.outerPulled) { 388 + state.outerPulled = true; 389 + state.outerTalkback(. Pull); 390 + }; 391 + 392 + Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 384 393 | Pull => () 385 394 }, 386 395 ), ··· 710 719 711 720 type switchMapStateT('a) = { 712 721 mutable outerTalkback: (. talkbackT) => unit, 722 + mutable outerPulled: bool, 713 723 mutable innerTalkback: (. talkbackT) => unit, 714 724 mutable innerActive: bool, 715 - mutable closed: bool, 725 + mutable innerPulled: bool, 716 726 mutable ended: bool, 717 727 }; 718 728 ··· 722 732 curry(sink => { 723 733 let state: switchMapStateT('a) = { 724 734 outerTalkback: talkbackPlaceholder, 735 + outerPulled: false, 725 736 innerTalkback: talkbackPlaceholder, 726 737 innerActive: false, 727 - closed: false, 738 + innerPulled: false, 728 739 ended: false, 729 740 }; 730 741 731 742 let applyInnerSource = innerSource => 732 743 innerSource((. signal) => 733 744 switch (signal) { 734 - | End => 735 - state.innerActive = false; 736 - state.innerTalkback = talkbackPlaceholder; 737 - if (state.ended) { 738 - sink(. End); 739 - }; 740 745 | Start(tb) => 741 746 state.innerActive = true; 742 747 state.innerTalkback = tb; 748 + state.innerPulled = false; 743 749 tb(. Pull); 744 - | Push(x) when !state.closed => 745 - sink(. Push(x)); 746 - state.innerTalkback(. Pull); 750 + | Push(_) when state.innerActive => 751 + sink(. signal); 752 + if (!state.innerPulled) { 753 + state.innerTalkback(. Pull); 754 + } else { 755 + state.innerPulled = false; 756 + }; 747 757 | Push(_) => () 758 + | End when state.innerActive => 759 + state.innerActive = false; 760 + if (state.ended) { 761 + sink(. signal); 762 + }; 763 + | End => () 748 764 } 749 765 ); 750 766 751 767 source((. signal) => 752 768 switch (signal) { 753 - | End when !state.ended => 754 - state.ended = true; 755 - if (!state.innerActive) { 756 - sink(. End); 757 - }; 758 - | End => () 759 - | Start(tb) => 760 - state.outerTalkback = tb; 761 - tb(. Pull); 769 + | Start(tb) => state.outerTalkback = tb 762 770 | Push(x) when !state.ended => 763 771 if (state.innerActive) { 764 772 state.innerTalkback(. Close); 765 773 state.innerTalkback = talkbackPlaceholder; 766 774 }; 775 + 776 + if (!state.outerPulled) { 777 + state.outerPulled = true; 778 + state.outerTalkback(. Pull); 779 + } else { 780 + state.outerPulled = false; 781 + }; 782 + 767 783 applyInnerSource(f(. x)); 768 - state.outerTalkback(. Pull); 769 784 | Push(_) => () 785 + | End when !state.ended => 786 + state.ended = true; 787 + if (!state.innerActive) { 788 + sink(. End); 789 + }; 790 + | End => () 770 791 } 771 792 ); 772 793 ··· 774 795 Start( 775 796 (. signal) => 776 797 switch (signal) { 777 - | Pull => state.innerTalkback(. Pull) 778 - | Close => 779 - state.innerTalkback(. Close); 780 - if (!state.ended) { 781 - state.ended = true; 782 - state.closed = true; 783 - state.outerTalkback(. Close); 784 - state.innerTalkback = talkbackPlaceholder; 798 + | Pull => 799 + if (!state.ended && !state.outerPulled) { 800 + state.outerPulled = true; 801 + state.outerTalkback(. Pull); 802 + }; 803 + if (state.innerActive && !state.innerPulled) { 804 + state.innerPulled = true; 805 + state.innerTalkback(. Pull); 785 806 }; 807 + | Close when !state.ended => 808 + state.ended = true; 809 + state.innerActive = false; 810 + state.innerTalkback(. Close); 811 + state.outerTalkback(. Close); 812 + | Close => () 786 813 }, 787 814 ), 788 815 );
+43 -17
src/wonka_operators.test.ts
··· 374 374 375 375 describe('concatMap', () => { 376 376 const noop = operators.concatMap(x => sources.fromValue(x)); 377 - // TODO: passesPassivePull(noop); 378 - // TODO: passesActivePush(noop); 379 - // TODO: passesSinkClose(noop); 380 - // TODO: passesSourceEnd(noop); 377 + passesPassivePull(noop); 378 + passesActivePush(noop); 379 + passesSinkClose(noop); 380 + passesSourceEnd(noop); 381 381 passesSingleStart(noop); 382 382 passesStrictEnd(noop); 383 383 passesAsyncSequence(noop); ··· 428 428 [10], 429 429 [20], 430 430 ]); 431 + }); 432 + 433 + it('emits synchronous values in order', () => { 434 + const values = []; 435 + 436 + sinks.forEach(x => values.push(x))( 437 + operators.concat([ 438 + sources.fromArray([1, 2]), 439 + sources.fromArray([3, 4]) 440 + ]) 441 + ); 442 + 443 + expect(values).toEqual([ 1, 2, 3, 4 ]); 431 444 }); 432 445 }); 433 446 ··· 541 554 542 555 describe('mergeMap', () => { 543 556 const noop = operators.mergeMap(x => sources.fromValue(x)); 544 - // TODO: passesPassivePull(noop); 545 - // TODO: passesActivePush(noop); 546 - // TODO: passesSinkClose(noop); 547 - // TODO: passesSourceEnd(noop); 557 + passesPassivePull(noop); 558 + passesActivePush(noop); 559 + passesSinkClose(noop); 560 + passesSourceEnd(noop); 548 561 passesSingleStart(noop); 549 562 passesStrictEnd(noop); 550 563 passesAsyncSequence(noop); ··· 582 595 })(source) 583 596 ); 584 597 585 - jest.advanceTimersByTime(14); 598 + jest.runAllTimers(); 586 599 expect(fn.mock.calls).toEqual([ 587 600 [1], 588 - [10], 589 601 [2], 602 + [10], 590 603 [20], 591 604 ]); 605 + }); 606 + 607 + it('emits synchronous values in order', () => { 608 + const values = []; 609 + 610 + sinks.forEach(x => values.push(x))( 611 + operators.merge([ 612 + sources.fromArray([1, 2]), 613 + sources.fromArray([3, 4]) 614 + ]) 615 + ); 616 + 617 + expect(values).toEqual([ 1, 2, 3, 4 ]); 592 618 }); 593 619 }); 594 620 ··· 817 843 818 844 describe('switchMap', () => { 819 845 const noop = operators.switchMap(x => sources.fromValue(x)); 820 - // TODO: passesPassivePull(noop); 821 - // TODO: passesActivePush(noop); 822 - // TODO: passesSinkClose(noop); 823 - // TODO: passesSourceEnd(noop); 846 + passesPassivePull(noop); 847 + passesActivePush(noop); 848 + passesSinkClose(noop); 849 + passesSourceEnd(noop); 824 850 passesSingleStart(noop); 825 851 passesStrictEnd(noop); 826 852 passesAsyncSequence(noop); ··· 853 879 const fn = jest.fn(); 854 880 855 881 sinks.forEach(fn)( 856 - operators.switchMap((x: number) => { 857 - return web.delay(5)(sources.fromArray([x, x * 2])); 858 - })(source) 882 + operators.switchMap((x: number) => ( 883 + operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5))) 884 + ))(source) 859 885 ); 860 886 861 887 jest.runAllTimers();