Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add JS/Web operators

+147
+89
src/operators.ts
··· 794 794 }; 795 795 } 796 796 797 + export function debounce<T>(timing: (value: T) => number): Operator<T, T> { 798 + return (source) => (sink) => { 799 + let id: any | void; 800 + let deferredEnded = false; 801 + let ended = false; 802 + source((signal) => { 803 + if (ended) { 804 + /*noop*/ 805 + } else if (signal === SignalKind.End) { 806 + ended = true; 807 + if (id) { 808 + deferredEnded = true; 809 + } else { 810 + sink(SignalKind.End); 811 + } 812 + } else if (signal.tag === SignalKind.Start) { 813 + const talkback = signal[0]; 814 + sink(start((signal) => { 815 + if (signal === TalkbackKind.Close && !ended) { 816 + ended = true; 817 + deferredEnded = false; 818 + if (id) clearTimeout(id); 819 + talkback(TalkbackKind.Close); 820 + } else if (!ended) { 821 + talkback(TalkbackKind.Pull); 822 + } 823 + })); 824 + } else { 825 + if (id) clearTimeout(id); 826 + id = setTimeout(() => { 827 + id = undefined; 828 + sink(signal); 829 + if (deferredEnded) sink(SignalKind.End); 830 + }, timing(signal[0])); 831 + } 832 + }); 833 + }; 834 + } 835 + 836 + export function delay<T>(wait: number): Operator<T, T> { 837 + return (source) => (sink) => { 838 + let active = 0; 839 + source((signal) => { 840 + if (typeof signal !== 'number' && signal.tag === SignalKind.Start) { 841 + sink(signal); 842 + } else { 843 + active++; 844 + setTimeout(() => { 845 + if (active) { 846 + active--; 847 + sink(signal); 848 + } 849 + }, wait); 850 + } 851 + }); 852 + }; 853 + } 854 + 855 + export function throttle<T>(timing: (value: T) => number): Operator<T, T> { 856 + return (source) => (sink) => { 857 + let skip = false; 858 + let id: any | void; 859 + source((signal) => { 860 + if (signal === SignalKind.End) { 861 + if (id) clearTimeout(id); 862 + sink(SignalKind.End); 863 + } else if (signal.tag === SignalKind.Start) { 864 + const talkback = signal[0]; 865 + sink(start((signal) => { 866 + if (signal === TalkbackKind.Close) { 867 + if (id) clearTimeout(id); 868 + talkback(TalkbackKind.Close); 869 + } else { 870 + talkback(TalkbackKind.Pull); 871 + } 872 + })); 873 + } else if (!skip) { 874 + skip = true; 875 + if (id) clearTimeout(id); 876 + id = setTimeout(() => { 877 + id = undefined; 878 + skip = false; 879 + }, timing(signal[0])); 880 + sink(signal); 881 + } 882 + }); 883 + }; 884 + } 885 + 797 886 export { 798 887 mergeAll as flatten, 799 888 onPush as tap,
+17
src/sinks.ts
··· 54 54 }); 55 55 return values; 56 56 } 57 + 58 + export function toPromise<T>(source: Source<T>): Promise<T> { 59 + return new Promise(resolve => { 60 + let talkback = talkbackPlaceholder; 61 + let value: T | void; 62 + source((signal) => { 63 + if (signal === SignalKind.End) { 64 + resolve(value!); 65 + } else if (signal.tag === SignalKind.Start) { 66 + (talkback = signal[0])(TalkbackKind.Pull); 67 + } else { 68 + value = signal[0]; 69 + talkback(TalkbackKind.Pull); 70 + } 71 + }); 72 + }) 73 + }
+41
src/sources.ts
··· 111 111 export const never: Source<any> = (sink: Sink<any>): void => { 112 112 sink(start(talkbackPlaceholder)); 113 113 }; 114 + 115 + export function interval(ms: number): Source<number> { 116 + return (sink) => { 117 + let i = 0; 118 + const id = setInterval(() => { 119 + sink(push(i++)); 120 + }, ms); 121 + sink(start((signal) => { 122 + if (signal === TalkbackKind.Close) 123 + clearInterval(id); 124 + })); 125 + }; 126 + } 127 + 128 + export function fromDomEvent(element: HTMLElement, event: string): Source<Event> { 129 + return (sink) => { 130 + const handler = (payload: Event) => { 131 + sink(push(payload)); 132 + }; 133 + sink(start((signal) => { 134 + if (signal === TalkbackKind.Close) 135 + element.removeEventListener(event, handler); 136 + })); 137 + element.addEventListener(event, handler); 138 + }; 139 + } 140 + 141 + export function fromPromise<T>(promise: Promise<T>): Source<T> { 142 + return (sink) => { 143 + let ended = false; 144 + promise.then((value) => { 145 + if (!ended) { 146 + sink(push(value)); 147 + sink(SignalKind.End); 148 + } 149 + }); 150 + sink(start((signal) => { 151 + if (signal === TalkbackKind.Close) ended = true; 152 + })); 153 + }; 154 + }