Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add basic sink functions

+56
+56
src/sinks.ts
··· 1 + import { Source, Subscription, TalkbackKind, SignalKind } from './types' 2 + import { talkbackPlaceholder } from './helpers' 3 + 4 + export function subscribe<T>(subscriber: (value: T) => void) { 5 + return (source: Source<T>): Subscription => { 6 + let talkback = talkbackPlaceholder; 7 + let ended = false; 8 + source((signal) => { 9 + if (signal === SignalKind.End) { 10 + ended = true; 11 + } else if (signal.tag === SignalKind.Start) { 12 + (talkback = signal[0])(TalkbackKind.Pull); 13 + } else if (!ended) { 14 + subscriber(signal[0]); 15 + talkback(TalkbackKind.Pull); 16 + } 17 + }); 18 + return { 19 + unsubscribe() { 20 + if (!ended) { 21 + ended = true; 22 + talkback(TalkbackKind.Close); 23 + } 24 + }, 25 + } 26 + } 27 + } 28 + 29 + export function forEach<T>(subscriber: (value: T) => void) { 30 + return (source: Source<T>): void => { 31 + subscribe(subscriber)(source); 32 + }; 33 + } 34 + 35 + export function publish<T>() { 36 + return (source: Source<T>): void => { 37 + subscribe((_value) => {/*noop*/})(source); 38 + }; 39 + } 40 + 41 + export function toArray<T>(source: Source<T>): T[] { 42 + const values: T[] = []; 43 + let talkback = talkbackPlaceholder; 44 + let ended = false; 45 + source((signal) => { 46 + if (signal === SignalKind.End) { 47 + ended = true; 48 + } else if (signal.tag === SignalKind.Start) { 49 + (talkback = signal[0])(TalkbackKind.Pull); 50 + } else if (!ended) { 51 + values.push(signal[0]); 52 + talkback(TalkbackKind.Pull); 53 + } 54 + }); 55 + return values; 56 + }