Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Reimplement all operators in TS

+807 -9
-2
src/helpers.ts
··· 14 14 box.tag = SignalKind.Push; 15 15 return box 16 16 } 17 - 18 - export const end: SignalKind.End = 0
+800
src/operators.ts
··· 1 + import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types' 2 + import { push, start, talkbackPlaceholder } from './helpers' 3 + import { fromArray } from './sources' 4 + 5 + const identity = <T>(x: T): T => x; 6 + 7 + export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> { 8 + return (source) => (sink) => { 9 + let buffer: T[] = []; 10 + let sourceTalkback = talkbackPlaceholder; 11 + let notifierTalkback = talkbackPlaceholder; 12 + let pulled = false; 13 + let ended = false; 14 + source((signal) => { 15 + if (ended) { 16 + /*noop*/ 17 + } else if (signal === SignalKind.End) { 18 + ended = true; 19 + notifierTalkback(TalkbackKind.Close); 20 + if (buffer.length) sink(push(buffer)); 21 + sink(SignalKind.End); 22 + } else if (signal.tag === SignalKind.Start) { 23 + sourceTalkback = signal[0]; 24 + notifier((signal) => { 25 + if (ended) { 26 + /*noop*/ 27 + } else if (signal === SignalKind.End) { 28 + ended = true; 29 + sourceTalkback(TalkbackKind.Close); 30 + if (buffer.length) sink(push(buffer)); 31 + sink(SignalKind.End); 32 + } else if (signal.tag === SignalKind.Start) { 33 + notifierTalkback = signal[0]; 34 + } else if (buffer.length) { 35 + const signal = push(buffer); 36 + buffer = []; 37 + sink(signal); 38 + } 39 + }); 40 + } else { 41 + buffer.push(signal[0]); 42 + if (!pulled) { 43 + pulled = true; 44 + sourceTalkback(TalkbackKind.Pull); 45 + notifierTalkback(TalkbackKind.Pull); 46 + } else { 47 + pulled = false; 48 + } 49 + } 50 + }); 51 + sink(start((signal) => { 52 + if (signal === TalkbackKind.Close && !ended) { 53 + ended = true; 54 + sourceTalkback(TalkbackKind.Close); 55 + notifierTalkback(TalkbackKind.Close); 56 + } else if (!ended && !pulled) { 57 + pulled = true; 58 + sourceTalkback(TalkbackKind.Pull); 59 + notifierTalkback(TalkbackKind.Pull); 60 + } 61 + })); 62 + }; 63 + } 64 + 65 + export function combine<A, B>(sourceA: Source<A>, sourceB: Source<B>): Source<[A, B]> { 66 + return (sink) => { 67 + let lastValA: A | void; 68 + let lastValB: B | void; 69 + let talkbackA = talkbackPlaceholder; 70 + let talkbackB = talkbackPlaceholder; 71 + let gotSignal = false; 72 + let gotEnd = false; 73 + let ended = false; 74 + sourceA((signal) => { 75 + if (signal === SignalKind.End) { 76 + if (!gotEnd) { 77 + gotEnd = true; 78 + } else { 79 + ended = true; 80 + sink(SignalKind.End); 81 + } 82 + } else if (signal.tag === SignalKind.Start) { 83 + talkbackA = signal[0]; 84 + } else if (lastValB === undefined && !ended) { 85 + lastValA = signal[0]; 86 + if (!gotSignal) { 87 + talkbackB(TalkbackKind.Pull); 88 + } else { 89 + gotSignal = false; 90 + } 91 + } else if (!ended) { 92 + lastValA = signal[0]; 93 + gotSignal = false; 94 + sink(push([lastValA, lastValB] as [A, B])); 95 + } 96 + }); 97 + sourceB((signal) => { 98 + if (signal === SignalKind.End) { 99 + if (!gotEnd) { 100 + gotEnd = true; 101 + } else { 102 + ended = true; 103 + sink(SignalKind.End); 104 + } 105 + } else if (signal.tag === SignalKind.Start) { 106 + talkbackB = signal[0]; 107 + } else if (lastValA === undefined && !ended) { 108 + lastValB = signal[0]; 109 + if (!gotSignal) { 110 + talkbackB(TalkbackKind.Pull); 111 + } else { 112 + gotSignal = false; 113 + } 114 + } else if (!ended) { 115 + lastValB = signal[0]; 116 + gotSignal = false; 117 + sink(push([lastValA, lastValB] as [A, B])); 118 + } 119 + }); 120 + sink(start((signal) => { 121 + if (ended) { 122 + /*noop*/ 123 + } else if (signal === TalkbackKind.Close) { 124 + ended = true; 125 + talkbackA(TalkbackKind.Close); 126 + talkbackB(TalkbackKind.Close); 127 + } else if (!gotSignal) { 128 + gotSignal = true; 129 + talkbackA(TalkbackKind.Pull); 130 + talkbackB(TalkbackKind.Pull); 131 + } 132 + })); 133 + }; 134 + } 135 + 136 + export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 137 + return (source) => (sink) => { 138 + const inputQueue: In[] = []; 139 + let outerTalkback = talkbackPlaceholder; 140 + let innerTalkback = talkbackPlaceholder; 141 + let outerPulled = false; 142 + let innerPulled = false; 143 + let innerActive = false; 144 + let ended = false; 145 + function applyInnerSource(innerSource: Source<Out>): void { 146 + innerActive = true; 147 + innerSource((signal) => { 148 + if (!innerActive) { 149 + /*noop*/ 150 + } else if (signal === SignalKind.End) { 151 + innerActive = false; 152 + if (inputQueue.length) { 153 + applyInnerSource(map(inputQueue.pop()!)) 154 + } else if (ended) { 155 + sink(SignalKind.End); 156 + } else if (!outerPulled) { 157 + outerPulled = true; 158 + outerTalkback(TalkbackKind.Pull); 159 + } 160 + } 161 + }); 162 + } 163 + source((signal) => { 164 + if (ended) { 165 + /*noop*/ 166 + } else if (signal === SignalKind.End) { 167 + ended = true; 168 + if (!innerActive && !inputQueue.length) 169 + sink(SignalKind.End); 170 + } else if (signal.tag === SignalKind.Start) { 171 + outerTalkback = signal[0]; 172 + } else { 173 + outerPulled = false; 174 + if (innerActive) { 175 + inputQueue.push(signal[0]); 176 + } else { 177 + applyInnerSource(map(signal[0])); 178 + } 179 + } 180 + }); 181 + sink(start((signal) => { 182 + if (signal === TalkbackKind.Close) { 183 + if (!ended) { 184 + ended = true; 185 + outerTalkback(TalkbackKind.Close); 186 + } 187 + if (innerActive) { 188 + innerActive = false; 189 + innerTalkback(TalkbackKind.Close); 190 + } 191 + } else { 192 + if (!ended && !outerPulled) { 193 + outerPulled = true; 194 + outerTalkback(TalkbackKind.Pull); 195 + } 196 + if (innerActive && !innerPulled) { 197 + innerPulled = true; 198 + innerTalkback(TalkbackKind.Pull); 199 + } 200 + } 201 + })); 202 + }; 203 + } 204 + 205 + export function concatAll<T>(source: Source<Source<T>>): Source<T> { 206 + return concatMap<Source<T>, T>(identity)(source); 207 + } 208 + 209 + export function concat<T>(sources: Source<T>[]): Source<T> { 210 + return concatAll(fromArray(sources)); 211 + } 212 + 213 + export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> { 214 + return (source) => (sink) => { 215 + let talkback = talkbackPlaceholder; 216 + source((signal) => { 217 + if (signal === SignalKind.End) { 218 + sink(signal); 219 + } else if (signal.tag === SignalKind.Start) { 220 + talkback = signal[0]; 221 + sink(signal); 222 + } else if (!predicate(signal[0])) { 223 + talkback(TalkbackKind.Pull); 224 + } else { 225 + sink(signal); 226 + } 227 + }); 228 + }; 229 + } 230 + 231 + export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> { 232 + return (source) => (sink) => source((signal) => { 233 + if (signal === SignalKind.End || signal.tag === SignalKind.Start) { 234 + sink(signal); 235 + } else { 236 + sink(push(map(signal[0]))); 237 + } 238 + }); 239 + } 240 + 241 + export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 242 + return (source) => (sink) => { 243 + const innerTalkbacks: TalkbackFn[] = []; 244 + let outerTalkback = talkbackPlaceholder; 245 + let outerPulled = false; 246 + let ended = false; 247 + function applyInnerSource(innerSource: Source<Out>): void { 248 + let talkback = talkbackPlaceholder; 249 + innerSource((signal) => { 250 + if (!innerTalkbacks.length) { 251 + /*noop*/ 252 + } else if (signal === SignalKind.End) { 253 + const index = innerTalkbacks.indexOf(talkback); 254 + if (index > -1) innerTalkbacks.splice(index, 1); 255 + if (!innerTalkbacks.length) { 256 + if (ended) { 257 + sink(SignalKind.End); 258 + } else if (!outerPulled) { 259 + outerPulled = true; 260 + outerTalkback(TalkbackKind.Pull); 261 + } 262 + } 263 + } else if (signal.tag === SignalKind.Start) { 264 + innerTalkbacks.push(talkback = signal[0]); 265 + talkback(TalkbackKind.Pull); 266 + } else { 267 + sink(signal); 268 + talkback(TalkbackKind.Pull); 269 + } 270 + }); 271 + } 272 + source((signal) => { 273 + if (ended) { 274 + /*noop*/ 275 + } else if (signal === SignalKind.End) { 276 + ended = true; 277 + if (!innerTalkbacks.length) 278 + sink(SignalKind.End); 279 + } else if (signal.tag === SignalKind.Start) { 280 + outerTalkback = signal[0]; 281 + } else { 282 + outerPulled = false; 283 + applyInnerSource(map(signal[0])); 284 + if (!outerPulled) { 285 + outerPulled = true; 286 + outerTalkback(TalkbackKind.Pull); 287 + } 288 + } 289 + }); 290 + sink(start((signal) => { 291 + if (signal === TalkbackKind.Close) { 292 + if (!ended) { 293 + ended = true; 294 + outerTalkback(TalkbackKind.Close); 295 + } 296 + while (innerTalkbacks.length) 297 + innerTalkbacks.pop()!(TalkbackKind.Close); 298 + } else { 299 + if (!ended && !outerPulled) { 300 + outerPulled = true; 301 + outerTalkback(TalkbackKind.Pull); 302 + } else { 303 + outerPulled = false; 304 + } 305 + for (let i = 0; i < innerTalkbacks.length; i++) 306 + innerTalkbacks[i](TalkbackKind.Pull); 307 + } 308 + })); 309 + }; 310 + } 311 + 312 + export function mergeAll<T>(source: Source<Source<T>>): Source<T> { 313 + return mergeMap<Source<T>, T>(identity)(source); 314 + } 315 + 316 + export function merge<T>(sources: Source<T>[]): Source<T> { 317 + return mergeAll(fromArray(sources)); 318 + } 319 + 320 + export function onEnd<T>(callback: () => void): Operator<T, T> { 321 + return (source) => (sink) => { 322 + let ended = false; 323 + source((signal) => { 324 + if (ended) { 325 + /*noop*/ 326 + } else if (signal === SignalKind.End) { 327 + ended = true; 328 + sink(SignalKind.End); 329 + callback(); 330 + } else if (signal.tag === SignalKind.Start) { 331 + const talkback = signal[0]; 332 + sink(start((signal) => { 333 + if (signal === TalkbackKind.Close) { 334 + ended = true; 335 + talkback(TalkbackKind.Close); 336 + callback(); 337 + } else { 338 + talkback(signal); 339 + } 340 + })); 341 + } else { 342 + sink(signal); 343 + } 344 + }); 345 + }; 346 + } 347 + 348 + export function onPush<T>(callback: (value: T) => void): Operator<T, T> { 349 + return (source) => (sink) => { 350 + let ended = false; 351 + source((signal) => { 352 + if (ended) { 353 + /*noop*/ 354 + } else if (signal === SignalKind.End) { 355 + ended = true; 356 + sink(SignalKind.End); 357 + } else if (signal.tag === SignalKind.Start) { 358 + const talkback = signal[0]; 359 + sink(start((signal) => { 360 + if (signal === TalkbackKind.Close) ended = true; 361 + talkback(signal); 362 + })); 363 + } else { 364 + callback(signal[0]); 365 + sink(signal); 366 + } 367 + }); 368 + }; 369 + } 370 + 371 + export function onStart<T>(callback: () => void): Operator<T, T> { 372 + return (source) => (sink) => source((signal) => { 373 + if (signal === SignalKind.End) { 374 + sink(SignalKind.End); 375 + } else if (signal.tag === SignalKind.Start) { 376 + sink(signal); 377 + callback(); 378 + } else { 379 + sink(signal); 380 + } 381 + }); 382 + } 383 + 384 + export function sample<S, T>(notifier: Source<S>): Operator<T, T> { 385 + return (source) => (sink) => { 386 + let sourceTalkback = talkbackPlaceholder; 387 + let notifierTalkback = talkbackPlaceholder; 388 + let value: T | void; 389 + let pulled = false; 390 + let ended = false; 391 + source((signal) => { 392 + if (ended) { 393 + /*noop*/ 394 + } else if (signal === SignalKind.End) { 395 + ended = true; 396 + notifierTalkback(TalkbackKind.Close); 397 + sink(SignalKind.End); 398 + } else if (signal.tag === SignalKind.Start) { 399 + sourceTalkback = signal[0]; 400 + } else { 401 + value = signal[0]; 402 + if (!pulled) { 403 + pulled = true; 404 + notifierTalkback(TalkbackKind.Pull); 405 + sourceTalkback(TalkbackKind.Pull); 406 + } else { 407 + pulled = false; 408 + } 409 + } 410 + }); 411 + notifier((signal) => { 412 + if (ended) { 413 + /*noop*/ 414 + } else if (signal === SignalKind.End) { 415 + ended = true; 416 + sourceTalkback(TalkbackKind.Close); 417 + sink(SignalKind.End); 418 + } else if (signal.tag === SignalKind.Start) { 419 + notifierTalkback = signal[0]; 420 + } else if (value !== undefined) { 421 + const signal = push(value); 422 + value = undefined; 423 + sink(signal); 424 + } 425 + }); 426 + sink(start((signal) => { 427 + if (signal === TalkbackKind.Close && !ended) { 428 + ended = true; 429 + sourceTalkback(TalkbackKind.Close); 430 + notifierTalkback(TalkbackKind.Close); 431 + } else if (!ended && !pulled) { 432 + pulled = true; 433 + sourceTalkback(TalkbackKind.Pull); 434 + notifierTalkback(TalkbackKind.Pull); 435 + } 436 + })); 437 + }; 438 + } 439 + 440 + export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> { 441 + return (source) => (sink) => { 442 + let acc = seed; 443 + source((signal) => { 444 + if (signal === SignalKind.End) { 445 + sink(SignalKind.End); 446 + } else if (signal.tag === SignalKind.Start) { 447 + sink(signal); 448 + } else { 449 + sink(push(acc = reducer(acc, signal[0]))); 450 + } 451 + }); 452 + }; 453 + } 454 + 455 + export function share<T>(source: Source<T>): Source<T> { 456 + const sinks: Sink<T>[] = []; 457 + let talkback = talkbackPlaceholder; 458 + let gotSignal = false; 459 + return (sink) => { 460 + sinks.push(sink); 461 + if (sinks.length === 1) { 462 + source((signal) => { 463 + if (signal === SignalKind.End) { 464 + while (sinks.length) sinks.pop()!(SignalKind.End); 465 + } else if (signal.tag === SignalKind.Start) { 466 + talkback = signal[0]; 467 + } else { 468 + gotSignal = false; 469 + for (let i = 0; i < sinks.length; i++) sinks[i](signal); 470 + } 471 + }); 472 + } 473 + sink(start((signal) => { 474 + if (signal === TalkbackKind.Close) { 475 + const index = sinks.indexOf(sink); 476 + if (index > -1) sinks.splice(index, 1); 477 + if (!sinks.length) talkback(TalkbackKind.Close); 478 + } else if (signal === TalkbackKind.Pull && !gotSignal) { 479 + gotSignal = true; 480 + talkback(TalkbackKind.Pull); 481 + } 482 + })); 483 + }; 484 + } 485 + 486 + export function skip<T>(wait: number): Operator<T, T> { 487 + return (source) => (sink) => { 488 + let talkback = talkbackPlaceholder; 489 + let rest = wait; 490 + source((signal) => { 491 + if (signal === SignalKind.End) { 492 + sink(SignalKind.End); 493 + } else if (signal.tag === SignalKind.Start) { 494 + talkback = signal[0]; 495 + sink(signal); 496 + } else if (rest-- > 0) { 497 + talkback(TalkbackKind.Pull); 498 + } else { 499 + sink(signal); 500 + } 501 + }); 502 + }; 503 + } 504 + 505 + export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> { 506 + return (source) => (sink) => { 507 + let sourceTalkback = talkbackPlaceholder; 508 + let notifierTalkback = talkbackPlaceholder; 509 + let skip = true; 510 + let pulled = false; 511 + let ended = false; 512 + source((signal) => { 513 + if (ended) { 514 + /*noop*/ 515 + } else if (signal === SignalKind.End) { 516 + ended = true; 517 + if (skip) notifierTalkback(TalkbackKind.Close); 518 + sink(SignalKind.End); 519 + } else if (signal.tag === SignalKind.Start) { 520 + sourceTalkback = signal[0]; 521 + notifier((signal) => { 522 + if (signal === SignalKind.End) { 523 + if (skip) { 524 + ended = true; 525 + sourceTalkback(TalkbackKind.Close); 526 + } 527 + } else if (signal.tag === SignalKind.Start) { 528 + (notifierTalkback = signal[0])(TalkbackKind.Pull); 529 + } else { 530 + skip = false; 531 + notifierTalkback(TalkbackKind.Close); 532 + } 533 + }); 534 + } else if (!skip) { 535 + pulled = false; 536 + sink(signal); 537 + } else if (!pulled) { 538 + pulled = true; 539 + sourceTalkback(TalkbackKind.Pull); 540 + notifierTalkback(TalkbackKind.Pull); 541 + } else { 542 + pulled = false; 543 + } 544 + }); 545 + sink(start((signal) => { 546 + if (signal === TalkbackKind.Close && !ended) { 547 + ended = true; 548 + sourceTalkback(TalkbackKind.Close); 549 + if (skip) notifierTalkback(TalkbackKind.Close); 550 + } else if (!ended && !pulled) { 551 + pulled = true; 552 + if (skip) notifierTalkback(TalkbackKind.Pull); 553 + sourceTalkback(TalkbackKind.Pull); 554 + } 555 + })); 556 + }; 557 + } 558 + 559 + export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 560 + return (source) => (sink) => { 561 + let talkback = talkbackPlaceholder; 562 + let skip = true; 563 + source((signal) => { 564 + if (signal === SignalKind.End) { 565 + sink(SignalKind.End); 566 + } else if (signal.tag === SignalKind.Start) { 567 + talkback = signal[0]; 568 + sink(signal); 569 + } else if (skip) { 570 + if (predicate(signal[0])) { 571 + talkback(TalkbackKind.Pull); 572 + } else { 573 + skip = false; 574 + sink(signal); 575 + } 576 + } else { 577 + sink(signal); 578 + } 579 + }); 580 + }; 581 + } 582 + 583 + export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 584 + return (source) => (sink) => { 585 + let outerTalkback = talkbackPlaceholder; 586 + let innerTalkback = talkbackPlaceholder; 587 + let outerPulled = false; 588 + let innerPulled = false; 589 + let innerActive = false; 590 + let ended = false; 591 + function applyInnerSource(innerSource: Source<Out>): void { 592 + innerActive = true; 593 + innerSource((signal) => { 594 + if (!innerActive) { 595 + /*noop*/ 596 + } else if (signal === SignalKind.End) { 597 + innerActive = false; 598 + if (ended) { 599 + sink(SignalKind.End); 600 + } else if (!outerPulled) { 601 + outerPulled = true; 602 + outerTalkback(TalkbackKind.Pull); 603 + } 604 + } else if (signal.tag === SignalKind.Start) { 605 + innerPulled = false; 606 + (innerTalkback = signal[0])(TalkbackKind.Pull); 607 + } else { 608 + sink(signal); 609 + if (!innerPulled) { 610 + innerTalkback(TalkbackKind.Pull); 611 + } else { 612 + innerPulled = false; 613 + } 614 + } 615 + }); 616 + } 617 + source((signal) => { 618 + if (ended) { 619 + /*noop*/ 620 + } else if (signal === SignalKind.End) { 621 + ended = true; 622 + if (!innerActive) 623 + sink(SignalKind.End); 624 + } else if (signal.tag === SignalKind.Start) { 625 + outerTalkback = signal[0]; 626 + } else { 627 + if (innerActive) { 628 + innerTalkback(TalkbackKind.Close); 629 + innerTalkback = talkbackPlaceholder; 630 + } 631 + if (!outerPulled) { 632 + outerPulled = true; 633 + outerTalkback(TalkbackKind.Pull); 634 + } else { 635 + outerPulled = false; 636 + } 637 + applyInnerSource(map(signal[0])); 638 + } 639 + }); 640 + sink(start((signal) => { 641 + if (signal === TalkbackKind.Close) { 642 + if (!ended) { 643 + ended = true; 644 + outerTalkback(TalkbackKind.Close); 645 + } 646 + if (innerActive) { 647 + innerActive = false; 648 + innerTalkback(TalkbackKind.Close); 649 + } 650 + } else { 651 + if (!ended && !outerPulled) { 652 + outerPulled = true; 653 + outerTalkback(TalkbackKind.Pull); 654 + } 655 + if (innerActive && !innerPulled) { 656 + innerPulled = true; 657 + innerTalkback(TalkbackKind.Pull); 658 + } 659 + } 660 + })); 661 + }; 662 + } 663 + 664 + export function switchAll<T>(source: Source<Source<T>>): Source<T> { 665 + return switchMap<Source<T>, T>(identity)(source); 666 + } 667 + 668 + export function take<T>(max: number): Operator<T, T> { 669 + return (source) => (sink) => { 670 + let talkback = talkbackPlaceholder; 671 + let ended = false; 672 + let taken = 0; 673 + source((signal) => { 674 + if (ended) { 675 + /*noop*/ 676 + } else if (signal === SignalKind.End) { 677 + ended = true; 678 + sink(SignalKind.End); 679 + } else if (signal.tag === SignalKind.Start) { 680 + if (max <= 0) { 681 + ended = true; 682 + sink(SignalKind.End); 683 + signal[0](TalkbackKind.Close); 684 + } else { 685 + talkback = signal[0]; 686 + } 687 + } else if (taken++ < max) { 688 + sink(signal); 689 + if (!ended && taken >= max) { 690 + ended = true; 691 + sink(SignalKind.End); 692 + talkback(TalkbackKind.Close); 693 + } 694 + } else { 695 + sink(signal); 696 + } 697 + }); 698 + sink(start((signal) => { 699 + if (signal === TalkbackKind.Close && !ended) { 700 + ended = true; 701 + talkback(TalkbackKind.Close); 702 + } else if (signal === TalkbackKind.Pull && !ended && taken < max) { 703 + talkback(TalkbackKind.Pull); 704 + } 705 + })); 706 + }; 707 + } 708 + 709 + export function takeLast<T>(max: number): Operator<T, T> { 710 + return (source) => (sink) => { 711 + const queue: T[] = []; 712 + let talkback = talkbackPlaceholder; 713 + source((signal) => { 714 + if (signal === SignalKind.End) { 715 + fromArray(queue)(sink); 716 + } else if (signal.tag === SignalKind.Start) { 717 + if (max <= 0) { 718 + signal[0](TalkbackKind.Close); 719 + fromArray(queue)(sink); 720 + } else { 721 + (talkback = signal[0])(TalkbackKind.Pull); 722 + } 723 + } else { 724 + if (queue.length >= max && max) queue.shift(); 725 + queue.push(signal[0]); 726 + talkback(TalkbackKind.Pull); 727 + } 728 + }); 729 + }; 730 + } 731 + 732 + export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> { 733 + return (source) => (sink) => { 734 + let sourceTalkback = talkbackPlaceholder; 735 + let notifierTalkback = talkbackPlaceholder; 736 + let ended = false; 737 + source((signal) => { 738 + if (ended) { 739 + /*noop*/ 740 + } else if (signal === SignalKind.End) { 741 + ended = true; 742 + notifierTalkback(TalkbackKind.Close); 743 + sink(SignalKind.End); 744 + } else if (signal.tag === SignalKind.Start) { 745 + sourceTalkback = signal[0]; 746 + notifier((signal) => { 747 + if (signal === SignalKind.End) { 748 + /*noop*/ 749 + } else if (signal.tag === SignalKind.Start) { 750 + (notifierTalkback = signal[0])(TalkbackKind.Pull); 751 + } else { 752 + ended = true; 753 + sourceTalkback(TalkbackKind.Close); 754 + sink(SignalKind.End); 755 + } 756 + }); 757 + } else { 758 + sink(signal); 759 + } 760 + }); 761 + sink(start((signal) => { 762 + if (signal === TalkbackKind.Close && !ended) { 763 + ended = true; 764 + sourceTalkback(TalkbackKind.Close); 765 + notifierTalkback(TalkbackKind.Close); 766 + } else if (!ended) { 767 + sourceTalkback(TalkbackKind.Pull); 768 + } 769 + })); 770 + }; 771 + } 772 + 773 + export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 774 + return (source) => (sink) => { 775 + let talkback = talkbackPlaceholder; 776 + let ended = false; 777 + source((signal) => { 778 + if (ended) { 779 + /*noop*/ 780 + } else if (signal === SignalKind.End) { 781 + ended = true; 782 + sink(SignalKind.End); 783 + } else if (signal.tag === SignalKind.Start) { 784 + talkback = signal[0]; 785 + sink(signal); 786 + } else if (!predicate(signal[0])) { 787 + ended = true; 788 + sink(SignalKind.End); 789 + talkback(TalkbackKind.Close); 790 + } else { 791 + sink(signal); 792 + } 793 + }); 794 + }; 795 + } 796 + 797 + export { 798 + mergeAll as flatten, 799 + onPush as tap, 800 + }
+7 -7
src/sources.ts
··· 1 - import { Source, Sink, TalkbackKind, Observer, Subject, TeardownFn } from './types' 2 - import { push, start, end, talkbackPlaceholder } from './helpers' 1 + import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types' 2 + import { push, start, talkbackPlaceholder } from './helpers' 3 3 4 4 export function fromArray<T>(array: T[]): Source<T> { 5 5 return (sink) => { ··· 21 21 sink(push(array[current])); 22 22 } else { 23 23 ended = true; 24 - sink(end); 24 + sink(SignalKind.End); 25 25 } 26 26 } 27 27 looping = false; ··· 39 39 } else if (!ended) { 40 40 ended = true; 41 41 sink(push(value)); 42 - sink(end); 42 + sink(SignalKind.End); 43 43 } 44 44 })) 45 45 } ··· 55 55 complete() { 56 56 if (!ended) { 57 57 ended = false; 58 - sink(end); 58 + sink(SignalKind.End); 59 59 } 60 60 }, 61 61 }); ··· 90 90 complete() { 91 91 if (!ended) { 92 92 ended = true; 93 - for (let i = 0; i < sinks.length; i++) sinks[i](end); 93 + for (let i = 0; i < sinks.length; i++) sinks[i](SignalKind.End); 94 94 } 95 95 }, 96 96 }; ··· 103 103 ended = true; 104 104 } else if (!ended) { 105 105 ended = true; 106 - sink(end); 106 + sink(SignalKind.End); 107 107 } 108 108 })); 109 109 };