Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add share operator

+53
+52
src/callbag.re
··· 113 113 loopSources(0); 114 114 }; 115 115 116 + type shareStateT('a) = { 117 + sinks: Hashtbl.t(int, signalT('a) => unit), 118 + mutable idCounter: int, 119 + mutable talkback: talkbackT => unit, 120 + mutable ended: bool, 121 + mutable gotSignal: bool 122 + }; 123 + 124 + let share = source => { 125 + let state = { 126 + sinks: Hashtbl.create(10), 127 + idCounter: 0, 128 + talkback: (_: talkbackT) => (), 129 + ended: false, 130 + gotSignal: false 131 + }; 132 + 133 + sink => { 134 + let id = state.idCounter; 135 + Hashtbl.add(state.sinks, id, sink); 136 + state.idCounter = state.idCounter + 1; 137 + 138 + if (state.idCounter === 1) { 139 + source(signal => { 140 + switch (signal) { 141 + | Push(_) when !state.ended => { 142 + Hashtbl.iter((_, sink) => sink(signal), state.sinks); 143 + state.gotSignal = false; 144 + } 145 + | Start(x) => state.talkback = x 146 + | End => state.ended = true 147 + | _ => () 148 + } 149 + }); 150 + }; 151 + 152 + sink(Start(signal => { 153 + switch (signal) { 154 + | End => { 155 + Hashtbl.remove(state.sinks, id); 156 + if (Hashtbl.length(state.sinks) === 0) { 157 + state.ended = true; 158 + state.talkback(End); 159 + }; 160 + } 161 + | Pull when !state.gotSignal => state.talkback(signal) 162 + | _ => () 163 + } 164 + })); 165 + } 166 + }; 167 + 116 168 let forEach = (f, source) => 117 169 captureTalkback(source, [@bs] (signal, talkback) => { 118 170 switch (signal) {
+1
src/callbag.rei
··· 7 7 let filter: ('a => bool, (signalT('a) => unit) => unit, signalT('a) => unit) => unit; 8 8 let scan: (('b, 'a) => 'b, 'b, (signalT('a) => unit) => unit, signalT('b) => unit) => unit; 9 9 let merge: (array((signalT('a) => unit) => unit), signalT('a) => unit) => unit; 10 + let share: ((signalT('a) => unit) => unit, signalT('a) => unit) => unit; 10 11 11 12 let forEach: ('a => unit, (signalT('a) => unit) => unit) => unit; 12 13 let subscribe: ('a => unit, (signalT('a) => unit) => unit) => (unit => unit);