Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix sample operator not respecting Pull (#53)

authored by

Phil Plückthun and committed by
GitHub
c6c30510 4213aaa0

+39 -22
+26 -16
src/wonka_operators.re
··· 478 478 ); 479 479 480 480 type sampleStateT('a) = { 481 - mutable ended: bool, 482 - mutable value: option('a), 483 481 mutable sourceTalkback: (. talkbackT) => unit, 484 482 mutable notifierTalkback: (. talkbackT) => unit, 483 + mutable value: option('a), 484 + mutable pulled: bool, 485 + mutable ended: bool, 485 486 }; 486 487 487 488 [@genType] ··· 489 490 curry(source => 490 491 curry(sink => { 491 492 let state = { 493 + sourceTalkback: talkbackPlaceholder, 494 + notifierTalkback: talkbackPlaceholder, 495 + value: None, 496 + pulled: false, 492 497 ended: false, 493 - value: None, 494 - sourceTalkback: (. _: talkbackT) => (), 495 - notifierTalkback: (. _: talkbackT) => (), 496 498 }; 497 499 498 500 source((. signal) => 499 501 switch (signal) { 500 502 | Start(tb) => state.sourceTalkback = tb 501 - | End => 503 + | Push(x) => 504 + state.value = Some(x); 505 + state.notifierTalkback(. Pull); 506 + | End when !state.ended => 502 507 state.ended = true; 503 508 state.notifierTalkback(. Close); 504 509 sink(. End); 505 - | Push(x) => state.value = Some(x) 510 + | End => () 506 511 } 507 512 ); 508 513 509 514 notifier((. signal) => 510 515 switch (signal, state.value) { 511 516 | (Start(tb), _) => state.notifierTalkback = tb 512 - | (End, _) => 517 + | (End, _) when !state.ended => 513 518 state.ended = true; 514 519 state.sourceTalkback(. Close); 515 520 sink(. End); 521 + | (End, _) => () 516 522 | (Push(_), Some(x)) when !state.ended => 517 523 state.value = None; 518 524 sink(. Push(x)); ··· 523 529 sink(. 524 530 Start( 525 531 (. signal) => 526 - switch (signal) { 527 - | Pull => 528 - state.sourceTalkback(. Pull); 529 - state.notifierTalkback(. Pull); 530 - | Close => 531 - state.ended = true; 532 - state.sourceTalkback(. Close); 533 - state.notifierTalkback(. Close); 532 + if (!state.ended) { 533 + switch (signal) { 534 + | Pull when !state.pulled => 535 + state.pulled = true; 536 + state.sourceTalkback(. Pull); 537 + state.notifierTalkback(. Pull); 538 + | Pull => () 539 + | Close => 540 + state.ended = true; 541 + state.sourceTalkback(. Close); 542 + state.notifierTalkback(. Close); 543 + }; 534 544 }, 535 545 ), 536 546 );
+13 -6
src/wonka_operators.test.ts
··· 692 692 }); 693 693 694 694 describe('sample', () => { 695 - const noop = operators.sample(sources.fromValue(null)); 696 - // TODO: passesPassivePull(noop); 697 - // TODO: passesActivePush(noop); 698 - // TODO: passesSinkClose(noop); 699 - // TODO: passesSourceEnd(noop); 695 + const valueThenNever: types.sourceT<any> = sink => 696 + sink(deriving.start(tb => { 697 + if (tb === deriving.pull) 698 + sink(deriving.push(null)); 699 + })); 700 + 701 + const noop = operators.sample(valueThenNever); 702 + 703 + passesPassivePull(noop); 704 + passesActivePush(noop); 705 + passesSinkClose(noop); 706 + passesSourceEnd(noop); 700 707 passesSingleStart(noop); 701 - // TODO: passesStrictEnd(noop); 708 + passesStrictEnd(noop); 702 709 703 710 it('emits the latest value when a notifier source emits', () => { 704 711 const { source: notifier$, next: notify } = sources.makeSubject();