Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add callbag utilities

+52 -2
+50
src/callbag.ts
··· 1 + import { Source, SignalKind, TalkbackKind } from './types' 2 + import { push, start } from './helpers' 3 + 4 + interface Callbag<I, O> { 5 + (t: 0, d: Callbag<O, I>): void; 6 + (t: 1, d: I): void; 7 + (t: 2, d?: any): void; 8 + } 9 + 10 + export function fromCallbag<T>(callbag: Callbag<void, T>): Source<T> { 11 + return (sink) => { 12 + callbag(0, (signal: number, data: any) => { 13 + if (signal === 0) { 14 + sink(start((signal) => { 15 + if (signal === TalkbackKind.Pull) { 16 + data(1); 17 + } else { 18 + data(2); 19 + } 20 + })); 21 + } else if (signal === 1) { 22 + sink(push(data)); 23 + } else if (signal === 2) { 24 + sink(SignalKind.End); 25 + } 26 + }); 27 + }; 28 + } 29 + 30 + export function toCallbag<T>(source: Source<T>): Callbag<void, T> { 31 + return (signal: number, sink: any) => { 32 + if (signal === 0) { 33 + source((signal) => { 34 + if (signal === SignalKind.End) { 35 + sink(2); 36 + } else if (signal.tag === SignalKind.Start) { 37 + sink(0, (num: number) => { 38 + if (num === 1) { 39 + signal[0](TalkbackKind.Pull); 40 + } else if (num === 2) { 41 + signal[0](TalkbackKind.Close); 42 + } 43 + }); 44 + } else { 45 + sink(1, signal[0]); 46 + } 47 + }); 48 + } 49 + }; 50 + }
+2 -2
src/observable.ts
··· 1 - import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types' 1 + import { Source, SignalKind, TalkbackKind } from './types' 2 2 import { push, start, talkbackPlaceholder } from './helpers' 3 3 4 4 interface ObservableSubscription { ··· 6 6 unsubscribe(): void; 7 7 } 8 8 9 - export interface ObservableObserver<T> { 9 + interface ObservableObserver<T> { 10 10 next(value: T): void; 11 11 error(error: any): void; 12 12 complete(): void;