Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix up mergeMap/concatMap

+34 -24
+33 -23
src/operators.ts
··· 145 145 function applyInnerSource(innerSource: Source<Out>): void { 146 146 innerActive = true; 147 147 innerSource((signal) => { 148 - if (!innerActive) { 149 - /*noop*/ 150 - } else if (signal === SignalKind.End) { 151 - innerActive = false; 152 - if (inputQueue.length) { 153 - applyInnerSource(map(inputQueue.pop()!)) 154 - } else if (ended) { 155 - sink(SignalKind.End); 156 - } else if (!outerPulled) { 157 - outerPulled = true; 158 - outerTalkback(TalkbackKind.Pull); 148 + if (signal === SignalKind.End) { 149 + if (innerActive) { 150 + innerActive = false; 151 + if (inputQueue.length) { 152 + applyInnerSource(map(inputQueue.pop()!)) 153 + } else if (ended) { 154 + sink(SignalKind.End); 155 + } else if (!outerPulled) { 156 + outerPulled = true; 157 + outerTalkback(TalkbackKind.Pull); 158 + } 159 + } 160 + } else if (signal.tag === SignalKind.Start) { 161 + innerPulled = false; 162 + (innerTalkback = signal[0])(TalkbackKind.Pull); 163 + } else if (innerActive) { 164 + sink(signal); 165 + if (innerPulled) { 166 + innerPulled = false; 167 + } else { 168 + innerTalkback(TalkbackKind.Pull); 159 169 } 160 170 } 161 171 }); ··· 247 257 function applyInnerSource(innerSource: Source<Out>): void { 248 258 let talkback = talkbackPlaceholder; 249 259 innerSource((signal) => { 250 - if (!innerTalkbacks.length) { 251 - /*noop*/ 252 - } else if (signal === SignalKind.End) { 253 - const index = innerTalkbacks.indexOf(talkback); 254 - if (index > -1) innerTalkbacks.splice(index, 1); 255 - if (!innerTalkbacks.length) { 256 - if (ended) { 257 - sink(SignalKind.End); 258 - } else if (!outerPulled) { 259 - outerPulled = true; 260 - outerTalkback(TalkbackKind.Pull); 260 + if (signal === SignalKind.End) { 261 + if (innerTalkbacks.length) { 262 + const index = innerTalkbacks.indexOf(talkback); 263 + if (index > -1) innerTalkbacks.splice(index, 1); 264 + if (!innerTalkbacks.length) { 265 + if (ended) { 266 + sink(SignalKind.End); 267 + } else if (!outerPulled) { 268 + outerPulled = true; 269 + outerTalkback(TalkbackKind.Pull); 270 + } 261 271 } 262 272 } 263 273 } else if (signal.tag === SignalKind.Start) { 264 274 innerTalkbacks.push(talkback = signal[0]); 265 275 talkback(TalkbackKind.Pull); 266 - } else { 276 + } else if (innerTalkbacks.length) { 267 277 sink(signal); 268 278 talkback(TalkbackKind.Pull); 269 279 }
+1 -1
src/sources.test.ts
··· 152 152 expect(collectSignals(source)).toEqual([ 153 153 start(expect.any(Function)), 154 154 push(0), 155 - end(), 155 + SignalKind.End, 156 156 ]); 157 157 }); 158 158