Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add subscribe sink allowing to end callbags early

+25
+24
src/callbag.re
··· 122 122 | End => () 123 123 } 124 124 }); 125 + 126 + let subscribe = (f, source) => { 127 + let talkback = ref((_: talkbackT) => ()); 128 + let ended = ref(false); 129 + 130 + source(signal => { 131 + switch (signal) { 132 + | Start(x) => { 133 + talkback := x; 134 + talkback^(Pull); 135 + } 136 + | Push(x) when !ended^ => { 137 + f(x); 138 + talkback^(Pull); 139 + } 140 + | _ => () 141 + } 142 + }); 143 + 144 + () => if (!ended^) { 145 + ended := true; 146 + talkback^(End); 147 + } 148 + };
+1
src/callbag.rei
··· 9 9 let merge: (array((signalT('a) => unit) => unit), signalT('a) => unit) => unit; 10 10 11 11 let forEach: ('a => unit, (signalT('a) => unit) => unit) => unit; 12 + let subscribe: ('a => unit, (signalT('a) => unit) => unit) => (unit => unit);