Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix debounce emitting End after Close signal (#49)

* Fix debounce emitting End after Close

Since there was no difference between "ended"
and "closed" in the debounce operator, after
the timeout completed it would emit an End
event even if the source has been closed.

* Add test for debounced End

authored by

Phil Plückthun and committed by
GitHub
592c41ea dcbdaf50

+45 -19
+32 -18
src/web/wonkaJs.re
··· 12 12 13 13 /* operators */ 14 14 15 + type debounceStateT = { 16 + mutable id: option(Js.Global.timeoutId), 17 + mutable deferredEnded: bool, 18 + mutable ended: bool, 19 + }; 20 + 15 21 [@genType] 16 22 let debounce = (f: (. 'a) => int): operatorT('a, 'a) => 17 23 curry(source => 18 24 curry(sink => { 19 - let gotEndSignal = ref(false); 20 - let id: ref(option(Js.Global.timeoutId)) = ref(None); 25 + let state: debounceStateT = { 26 + id: None, 27 + deferredEnded: false, 28 + ended: false, 29 + }; 21 30 22 31 let clearTimeout = () => 23 - switch (id^) { 32 + switch (state.id) { 24 33 | Some(timeoutId) => 25 - id := None; 34 + state.id = None; 26 35 Js.Global.clearTimeout(timeoutId); 27 36 | None => () 28 37 }; ··· 33 42 sink(. 34 43 Start( 35 44 (. signal) => 36 - switch (signal) { 37 - | Close => 38 - clearTimeout(); 39 - tb(. Close); 40 - | _ => tb(. signal) 45 + if (!state.ended) { 46 + switch (signal) { 47 + | Close => 48 + state.ended = true; 49 + state.deferredEnded = false; 50 + clearTimeout(); 51 + tb(. Close); 52 + | Pull => tb(. Pull) 53 + }; 41 54 }, 42 55 ), 43 56 ) 44 - | Push(x) => 57 + | Push(x) when !state.ended => 45 58 clearTimeout(); 46 - id := 59 + state.id = 47 60 Some( 48 61 Js.Global.setTimeout( 49 62 () => { 50 - id := None; 63 + state.id = None; 51 64 sink(. signal); 52 - if (gotEndSignal^) { 65 + if (state.deferredEnded) { 53 66 sink(. End); 54 67 }; 55 68 }, 56 69 f(. x), 57 70 ), 58 71 ); 59 - | End => 60 - gotEndSignal := true; 61 - 62 - switch (id^) { 72 + | Push(_) => () 73 + | End when !state.ended => 74 + state.ended = true; 75 + switch (state.id) { 76 + | Some(_) => state.deferredEnded = true 63 77 | None => sink(. End) 64 - | _ => () 65 78 }; 79 + | End => () 66 80 } 67 81 ); 68 82 })
+13 -1
src/wonka_operators.test.ts
··· 430 430 passesSinkClose(noop); 431 431 passesSourceEnd(noop); 432 432 passesSingleStart(noop); 433 - // TODO: passesStrictEnd(noop); 433 + passesStrictEnd(noop); 434 434 passesAsyncSequence(noop); 435 435 436 436 it('waits for a specified amount of silence before emitting the last value', () => { ··· 449 449 450 450 jest.advanceTimersByTime(1); 451 451 expect(fn).toHaveBeenCalledWith(2); 452 + }); 453 + 454 + it('emits debounced value with delayed End signal', () => { 455 + const { source, next, complete } = sources.makeSubject<number>(); 456 + const fn = jest.fn(); 457 + 458 + sinks.forEach(fn)(web.debounce(() => 100)(source)); 459 + 460 + next(1); 461 + complete(); 462 + jest.advanceTimersByTime(100); 463 + expect(fn).toHaveBeenCalled(); 452 464 }); 453 465 }); 454 466