Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove captureTalkback helper and add takeLast tests (#57)

* Remove captureTalkback from filter operator

* Remove captureTalkback from skip operator

* Remove captureTalkback from skipWhile operator

* Refactor takeLast, add tests, and remove captureTalkback

* Delete captureTalkback helper

authored by

Phil Plückthun and committed by
GitHub
470eeb51 2b4d0fa2

+79 -37
-17
src/helpers/wonka_helpers.re
··· 2 2 3 3 let talkbackPlaceholder = (. _: talkbackT) => (); 4 4 5 - let captureTalkback = 6 - ( 7 - source: sourceT('a), 8 - sinkWithTalkback: (. signalT('a), (. talkbackT) => unit) => unit, 9 - ) => { 10 - let talkback = ref(talkbackPlaceholder); 11 - 12 - source((. signal) => { 13 - switch (signal) { 14 - | Start(x) => talkback := x 15 - | _ => () 16 - }; 17 - 18 - sinkWithTalkback(. signal, talkback^); 19 - }); 20 - }; 21 - 22 5 type trampolineT = { 23 6 mutable ended: bool, 24 7 mutable looping: bool,
+49 -20
src/wonka_operators.re
··· 287 287 [@genType] 288 288 let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 289 289 curry(source => 290 - curry(sink => 291 - captureTalkback(source, (. signal, talkback) => 290 + curry(sink => { 291 + let talkback = ref(talkbackPlaceholder); 292 + 293 + source((. signal) => 292 294 switch (signal) { 293 - | Push(x) when !f(. x) => talkback(. Pull) 295 + | Start(tb) => 296 + talkback := tb; 297 + sink(. signal); 298 + | Push(x) when !f(. x) => talkback^(. Pull) 294 299 | _ => sink(. signal) 295 300 } 296 - ) 297 - ) 301 + ); 302 + }) 298 303 ); 299 304 300 305 [@genType] ··· 304 309 source((. signal) => 305 310 sink(. 306 311 switch (signal) { 307 - | Start(x) => Start(x) 308 312 | Push(x) => Push(f(. x)) 309 - | End => End 313 + | _ => signal 310 314 }, 311 315 ) 312 316 ) ··· 606 610 }; 607 611 }; 608 612 613 + type skipStateT = { 614 + mutable talkback: (. talkbackT) => unit, 615 + mutable rest: int, 616 + }; 617 + 609 618 [@genType] 610 619 let skip = (wait: int): operatorT('a, 'a) => 611 620 curry(source => 612 621 curry(sink => { 613 - let rest = ref(wait); 622 + let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 614 623 615 - captureTalkback(source, (. signal, talkback) => 624 + source((. signal) => 616 625 switch (signal) { 617 - | Push(_) when rest^ > 0 => 618 - rest := rest^ - 1; 619 - talkback(. Pull); 626 + | Start(tb) => 627 + state.talkback = tb; 628 + sink(. signal); 629 + | Push(_) when state.rest > 0 => 630 + state.rest = state.rest - 1; 631 + state.talkback(. Pull); 620 632 | _ => sink(. signal) 621 633 } 622 634 ); ··· 700 712 }) 701 713 ); 702 714 715 + type skipWhileStateT = { 716 + mutable talkback: (. talkbackT) => unit, 717 + mutable skip: bool, 718 + }; 719 + 703 720 [@genType] 704 721 let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 705 722 curry(source => 706 723 curry(sink => { 707 - let skip = ref(true); 724 + let state: skipWhileStateT = { 725 + talkback: talkbackPlaceholder, 726 + skip: true, 727 + }; 708 728 709 - captureTalkback(source, (. signal, talkback) => 729 + source((. signal) => 710 730 switch (signal) { 711 - | Push(x) when skip^ => 731 + | Start(tb) => 732 + state.talkback = tb; 733 + sink(. signal); 734 + | Push(x) when state.skip => 712 735 if (f(. x)) { 713 - talkback(. Pull); 736 + state.talkback(. Pull); 714 737 } else { 715 - skip := false; 738 + state.skip = false; 716 739 sink(. signal); 717 740 } 718 741 | _ => sink(. signal) ··· 885 908 curry(source => 886 909 curry(sink => { 887 910 open Rebel; 911 + let talkback = ref(talkbackPlaceholder); 888 912 let queue = MutableQueue.make(); 889 913 890 - captureTalkback(source, (. signal, talkback) => 914 + source((. signal) => 891 915 switch (signal) { 892 - | Start(_) => talkback(. Pull) 916 + | Start(tb) when max <= 0 => 917 + tb(. Close); 918 + Wonka_sources.empty(sink); 919 + | Start(tb) => 920 + talkback := tb; 921 + tb(. Pull); 893 922 | Push(x) => 894 923 let size = MutableQueue.size(queue); 895 924 if (size >= max && max > 0) { ··· 897 926 }; 898 927 899 928 MutableQueue.add(queue, x); 900 - talkback(. Pull); 929 + talkback^(. Pull); 901 930 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 902 931 } 903 932 );
+30
src/wonka_operators.test.ts
··· 990 990 }); 991 991 }); 992 992 993 + describe('takeLast', () => { 994 + passesCloseAndEnd(operators.takeLast(0)); 995 + 996 + it('emits the last max values of an ended source', () => { 997 + const { source, next, complete } = sources.makeSubject<number>(); 998 + const values = []; 999 + 1000 + let talkback; 1001 + operators.takeLast(1)(source)(signal => { 1002 + values.push(signal); 1003 + if (deriving.isStart(signal)) 1004 + talkback = deriving.unboxStart(signal); 1005 + if (!deriving.isEnd(signal)) 1006 + talkback(deriving.pull); 1007 + }); 1008 + 1009 + next(1); 1010 + next(2); 1011 + 1012 + expect(values.length).toBe(0); 1013 + complete(); 1014 + 1015 + expect(values).toEqual([ 1016 + deriving.start(expect.any(Function)), 1017 + deriving.push(2), 1018 + deriving.end(), 1019 + ]); 1020 + }); 1021 + }); 1022 + 993 1023 describe('throttle', () => { 994 1024 const noop = web.throttle(() => 0); 995 1025 passesPassivePull(noop);