Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Update tests

+742 -678
+6 -1
package.json
··· 42 42 "devDependencies": { 43 43 "@types/jest": "^28.1.6", 44 44 "@types/node": "^18.7.2", 45 + "@types/zen-observable": "^0.8.3", 46 + "callbag-from-iter": "^1.3.0", 47 + "callbag-iterate": "^1.0.0", 48 + "callbag-take": "^1.5.0", 45 49 "jest": "^28.1.3", 46 - "typescript": "^4.7.4" 50 + "typescript": "^4.7.4", 51 + "zen-observable": "^0.8.15" 47 52 } 48 53 }
+272 -246
src/Wonka_operators.test.ts src/operators.test.ts
··· 1 - import * as deriving from './helpers/Wonka_deriving'; 2 - import * as sources from './Wonka_sources.gen'; 3 - import * as sinks from './Wonka_sinks.gen'; 4 - import * as operators from './Wonka_operators.gen'; 5 - import * as web from './web/WonkaJs.gen'; 6 - import * as types from './Wonka_types.gen'; 1 + import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2 + import { push, start } from './helpers'; 3 + 4 + import * as sources from './sources'; 5 + import * as sinks from './sinks'; 6 + import * as operators from './operators'; 7 7 8 8 /* This tests a noop operator for passive Pull talkback signals. 9 9 A Pull will be sent from the sink upwards and should pass through 10 10 the operator until the source receives it, which then pushes a 11 11 value down. */ 12 12 const passesPassivePull = ( 13 - operator: types.operatorT<any, any>, 13 + operator: Operator<any, any>, 14 14 output: any = 0 15 - ) => 15 + ) => { 16 16 it('responds to Pull talkback signals (spec)', () => { 17 - let talkback = null; 18 - let push = 0; 19 - const values = []; 17 + let talkback: TalkbackFn | null = null; 18 + let pushes = 0; 19 + const values: any[] = []; 20 20 21 - const source: types.sourceT<any> = sink => { 22 - sink(deriving.start(tb => { 23 - if (!push && tb === deriving.pull) { 24 - push++; 25 - sink(deriving.push(0)); 21 + const source: Source<any> = sink => { 22 + sink(start((signal) => { 23 + if (!pushes && signal === TalkbackKind.Pull) { 24 + pushes++; 25 + sink(push(0)); 26 26 } 27 27 })); 28 28 }; 29 29 30 - const sink: types.sinkT<any> = signal => { 31 - expect(deriving.isEnd(signal)).toBeFalsy(); 32 - if (deriving.isPush(signal)) { 33 - values.push(deriving.unboxPush(signal)); 34 - } else if (deriving.isStart(signal)) { 35 - talkback = deriving.unboxStart(signal); 30 + const sink: Sink<any> = (signal) => { 31 + expect(signal).not.toBe(SignalKind.End); 32 + if (signal === SignalKind.End) { 33 + /*noop*/ 34 + } else if (signal.tag === SignalKind.Push) { 35 + values.push(signal[0]); 36 + } else { 37 + talkback = signal[0]; 36 38 } 37 39 }; 38 40 ··· 43 45 expect(values).toEqual([]); 44 46 45 47 // When pulling a value we expect an immediate response 46 - talkback(deriving.pull); 48 + talkback!(TalkbackKind.Pull); 47 49 jest.runAllTimers(); 48 50 expect(values).toEqual([output]); 49 51 }); 52 + }; 50 53 51 54 /* This tests a noop operator for regular, active Push signals. 52 55 A Push will be sent downwards from the source, through the 53 56 operator to the sink. Pull events should be let through from 54 57 the sink after every Push event. */ 55 58 const passesActivePush = ( 56 - operator: types.operatorT<any, any>, 59 + operator: Operator<any, any>, 57 60 result: any = 0 58 - ) => 61 + ) => { 59 62 it('responds to eager Push signals (spec)', () => { 60 - const values = []; 61 - let talkback = null; 62 - let push = null; 63 + const values: any[] = []; 64 + let sink: Sink<any> | null = null; 63 65 let pulls = 0; 64 66 65 - const source: types.sourceT<any> = sink => { 66 - push = (value: any) => sink(deriving.push(value)); 67 - sink(deriving.start(tb => { 68 - if (tb === deriving.pull) 67 + const source: Source<any> = (_sink) => { 68 + sink = _sink; 69 + sink(start((signal) => { 70 + if (signal === TalkbackKind.Pull) 69 71 pulls++; 70 72 })); 71 73 }; 72 74 73 - const sink: types.sinkT<any> = signal => { 74 - expect(deriving.isEnd(signal)).toBeFalsy(); 75 - if (deriving.isStart(signal)) { 76 - talkback = deriving.unboxStart(signal); 77 - } else if (deriving.isPush(signal)) { 78 - values.push(deriving.unboxPush(signal)); 79 - talkback(deriving.pull); 75 + operator(source)((signal) => { 76 + expect(signal).not.toBe(SignalKind.End); 77 + if (signal === SignalKind.End) { 78 + /*noop*/ 79 + } else if (signal.tag === SignalKind.Push) { 80 + values.push(signal[0]); 80 81 } 81 - }; 82 + }); 82 83 83 - operator(source)(sink); 84 84 // No Pull signals should be issued initially 85 85 expect(pulls).toBe(0); 86 86 87 87 // When pushing a value we expect an immediate response 88 - push(0); 88 + sink!(push(0)); 89 89 jest.runAllTimers(); 90 90 expect(values).toEqual([result]); 91 91 // Subsequently the Pull signal should have travelled upwards 92 92 expect(pulls).toBe(1); 93 93 }); 94 + }; 94 95 95 96 /* This tests a noop operator for Close talkback signals from the sink. 96 97 A Close signal will be sent, which should be forwarded to the source, 97 98 which then ends the communication without sending an End signal. */ 98 - const passesSinkClose = (operator: types.operatorT<any, any>) => 99 + const passesSinkClose = (operator: Operator<any, any>) => { 99 100 it('responds to Close signals from sink (spec)', () => { 100 - let talkback = null; 101 + let talkback: TalkbackFn | null = null; 101 102 let closing = 0; 102 103 103 - const source: types.sourceT<any> = sink => { 104 - sink(deriving.start(tb => { 105 - if (tb === deriving.pull && !closing) { 106 - sink(deriving.push(0)); 107 - } else if (tb === deriving.close) { 104 + const source: Source<any> = sink => { 105 + sink(start((signal) => { 106 + if (signal === TalkbackKind.Pull && !closing) { 107 + sink(push(0)); 108 + } else if (signal === TalkbackKind.Close) { 108 109 closing++; 109 110 } 110 111 })); 111 112 }; 112 113 113 - const sink: types.sinkT<any> = signal => { 114 - expect(deriving.isEnd(signal)).toBeFalsy(); 115 - if (deriving.isStart(signal)) { 116 - talkback = deriving.unboxStart(signal); 117 - } else if (deriving.isPush(signal)) { 118 - talkback(deriving.close); 114 + const sink: Sink<any> = signal => { 115 + expect(signal).not.toBe(SignalKind.End); 116 + if (signal === SignalKind.End) { 117 + /*noop*/ 118 + } else if (signal.tag === SignalKind.Push) { 119 + talkback!(TalkbackKind.Close); 120 + } else { 121 + talkback = signal[0]; 119 122 } 120 123 }; 121 124 122 125 operator(source)(sink); 123 126 124 127 // When pushing a value we expect an immediate close signal 125 - talkback(deriving.pull); 128 + talkback!(TalkbackKind.Pull); 126 129 jest.runAllTimers(); 127 130 expect(closing).toBe(1); 128 131 }); 132 + }; 129 133 130 134 /* This tests a noop operator for End signals from the source. 131 135 A Push and End signal will be sent after the first Pull talkback 132 136 signal from the sink, which shouldn't lead to any extra Close or Pull 133 137 talkback signals. */ 134 138 const passesSourceEnd = ( 135 - operator: types.operatorT<any, any>, 139 + operator: Operator<any, any>, 136 140 result: any = 0 137 - ) => 141 + ) => { 138 142 it('passes on immediate Push then End signals from source (spec)', () => { 139 - const signals = []; 140 - let talkback = null; 143 + const signals: Signal<any>[] = []; 144 + let talkback: TalkbackFn | null = null; 141 145 let pulls = 0; 142 146 let ending = 0; 143 147 144 - const source: types.sourceT<any> = sink => { 145 - sink(deriving.start(tb => { 146 - expect(tb).not.toBe(deriving.close); 147 - if (tb === deriving.pull) { 148 + const source: Source<any> = sink => { 149 + sink(start((signal) => { 150 + expect(signal).not.toBe(TalkbackKind.Close); 151 + if (signal === TalkbackKind.Pull) { 148 152 pulls++; 149 153 if (pulls === 1) { 150 - sink(deriving.push(0)); 151 - sink(deriving.end()); 154 + sink(push(0)); 155 + sink(SignalKind.End); 152 156 } 153 157 } 154 158 })); 155 159 }; 156 160 157 - const sink: types.sinkT<any> = signal => { 158 - if (deriving.isStart(signal)) { 159 - talkback = deriving.unboxStart(signal); 160 - } else { 161 + const sink: Sink<any> = signal => { 162 + if (signal === SignalKind.End) { 163 + signals.push(signal); 164 + ending++; 165 + } else if (signal.tag === SignalKind.Push) { 161 166 signals.push(signal); 162 - if (deriving.isEnd(signal)) ending++; 167 + } else { 168 + talkback = signal[0]; 163 169 } 164 170 }; 165 171 166 172 operator(source)(sink); 167 173 168 174 // When pushing a value we expect an immediate Push then End signal 169 - talkback(deriving.pull); 175 + talkback!(TalkbackKind.Pull); 170 176 jest.runAllTimers(); 171 177 expect(ending).toBe(1); 172 - expect(signals).toEqual([deriving.push(result), deriving.end()]); 178 + expect(signals).toEqual([push(result), SignalKind.End]); 173 179 // Also no additional pull event should be created by the operator 174 180 expect(pulls).toBe(1); 175 181 }); 182 + }; 176 183 177 184 /* This tests a noop operator for End signals from the source 178 185 after the first pull in response to another. 179 186 This is similar to passesSourceEnd but more well behaved since 180 187 mergeMap/switchMap/concatMap are eager operators. */ 181 188 const passesSourcePushThenEnd = ( 182 - operator: types.operatorT<any, any>, 189 + operator: Operator<any, any>, 183 190 result: any = 0 184 - ) => 191 + ) => { 185 192 it('passes on End signals from source (spec)', () => { 186 - const signals = []; 187 - let talkback = null; 193 + const signals: Signal<any>[] = []; 194 + let talkback: TalkbackFn | null = null; 188 195 let pulls = 0; 189 196 let ending = 0; 190 197 191 - const source: types.sourceT<any> = sink => { 192 - sink(deriving.start(tb => { 193 - expect(tb).not.toBe(deriving.close); 194 - if (tb === deriving.pull) { 198 + const source: Source<any> = sink => { 199 + sink(start((signal) => { 200 + expect(signal).not.toBe(TalkbackKind.Close); 201 + if (signal === TalkbackKind.Pull) { 195 202 pulls++; 196 - if (pulls <= 2) { sink(deriving.push(0)); } 197 - else { sink(deriving.end()); } 203 + if (pulls <= 2) { sink(push(0)); } 204 + else { sink(SignalKind.End); } 198 205 } 199 206 })); 200 207 }; 201 208 202 - const sink: types.sinkT<any> = signal => { 203 - if (deriving.isStart(signal)) { 204 - talkback = deriving.unboxStart(signal); 205 - } else { 209 + const sink: Sink<any> = signal => { 210 + if (signal === SignalKind.End) { 206 211 signals.push(signal); 207 - if (deriving.isPush(signal)) talkback(deriving.pull); 208 - if (deriving.isEnd(signal)) ending++; 212 + ending++; 213 + } else if (signal.tag === SignalKind.Push) { 214 + signals.push(signal); 215 + talkback!(TalkbackKind.Pull); 216 + } else { 217 + talkback = signal[0]; 209 218 } 210 219 }; 211 220 212 221 operator(source)(sink); 213 222 214 223 // When pushing a value we expect an immediate Push then End signal 215 - talkback(deriving.pull); 224 + talkback!(TalkbackKind.Pull); 216 225 jest.runAllTimers(); 217 226 expect(ending).toBe(1); 218 227 expect(pulls).toBe(3); 219 228 expect(signals).toEqual([ 220 - deriving.push(result), 221 - deriving.push(result), 222 - deriving.end() 229 + push(result), 230 + push(result), 231 + SignalKind.End 223 232 ]); 224 233 }); 234 + }; 225 235 226 236 /* This tests a noop operator for Start signals from the source. 227 237 When the operator's sink is started by the source it'll receive 228 238 a Start event. As a response it should never send more than one 229 239 Start signals to the sink. */ 230 - const passesSingleStart = (operator: types.operatorT<any, any>) => 240 + const passesSingleStart = (operator: Operator<any, any>) => { 231 241 it('sends a single Start event to the incoming sink (spec)', () => { 232 - let start = 0; 242 + let starts = 0; 233 243 234 - const source: types.sourceT<any> = sink => { 235 - sink(deriving.start(() => {})); 244 + const source: Source<any> = sink => { 245 + sink(start(() => {})); 236 246 }; 237 247 238 - const sink: types.sinkT<any> = signal => { 239 - if (deriving.isStart(signal)) start++; 248 + const sink: Sink<any> = signal => { 249 + if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 250 + starts++; 251 + } 240 252 }; 241 253 242 254 // When starting the operator we expect a single start event on the sink 243 255 operator(source)(sink); 244 - expect(start).toBe(1); 256 + expect(starts).toBe(1); 245 257 }); 258 + }; 246 259 247 260 /* This tests a noop operator for silence after End signals from the source. 248 261 When the operator receives the End signal it shouldn't forward any other ··· 250 263 This isn't a strict requirement, but some operators should ensure that 251 264 all sources are well behaved. This is particularly true for operators 252 265 that either Close sources themselves or may operate on multiple sources. */ 253 - const passesStrictEnd = (operator: types.operatorT<any, any>) => { 266 + const passesStrictEnd = (operator: Operator<any, any>) => { 254 267 it('stops all signals after End has been received (spec: strict end)', () => { 255 268 let pulls = 0; 256 - const signals = []; 269 + const signals: Signal<any>[] = []; 257 270 258 - const source: types.sourceT<any> = sink => { 259 - sink(deriving.start(tb => { 260 - if (tb === deriving.pull) { 271 + const source: Source<any> = sink => { 272 + sink(start((signal) => { 273 + if (signal === TalkbackKind.Pull) { 261 274 pulls++; 262 - sink(deriving.end()); 263 - sink(deriving.push(123)); 275 + sink(SignalKind.End); 276 + sink(push(123)); 264 277 } 265 278 })); 266 279 }; 267 280 268 - const sink: types.sinkT<any> = signal => { 269 - if (deriving.isStart(signal)) { 270 - deriving.unboxStart(signal)(deriving.pull); 271 - } else { 281 + const sink: Sink<any> = signal => { 282 + if (signal === SignalKind.End) { 283 + signals.push(signal); 284 + } else if (signal.tag === SignalKind.Push) { 272 285 signals.push(signal); 286 + } else { 287 + signal[0](TalkbackKind.Pull); 273 288 } 274 289 }; 275 290 ··· 277 292 278 293 // The Push signal should've been dropped 279 294 jest.runAllTimers(); 280 - expect(signals).toEqual([deriving.end()]); 295 + expect(signals).toEqual([SignalKind.End]); 281 296 expect(pulls).toBe(1); 282 297 }); 283 298 284 299 it('stops all signals after Close has been received (spec: strict close)', () => { 285 - const signals = []; 300 + const signals: Signal<any>[] = []; 286 301 287 - const source: types.sourceT<any> = sink => { 288 - sink(deriving.start(tb => { 289 - if (tb === deriving.close) { 290 - sink(deriving.push(123)); 302 + const source: Source<any> = sink => { 303 + sink(start((signal) => { 304 + if (signal === TalkbackKind.Close) { 305 + sink(push(123)); 291 306 } 292 307 })); 293 308 }; 294 309 295 - const sink: types.sinkT<any> = signal => { 296 - if (deriving.isStart(signal)) { 297 - deriving.unboxStart(signal)(deriving.close); 298 - } else { 310 + const sink: Sink<any> = signal => { 311 + if (signal === SignalKind.End) { 299 312 signals.push(signal); 313 + } else if (signal.tag === SignalKind.Push) { 314 + signals.push(signal); 315 + } else { 316 + signal[0](TalkbackKind.Close); 300 317 } 301 318 }; 302 319 ··· 313 330 When an operator closes immediately we expect to see a Close 314 331 signal at the source and an End signal to the sink, since the 315 332 closing operator is expected to end the entire chain. */ 316 - const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) => 333 + const passesCloseAndEnd = (closingOperator: Operator<any, any>) => { 317 334 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 318 335 let closing = 0; 319 336 let ending = 0; 320 337 321 - const source: types.sourceT<any> = sink => { 322 - sink(deriving.start(tb => { 338 + const source: Source<any> = sink => { 339 + sink(start((signal) => { 323 340 // For some operator tests we do need to send a single value 324 - if (tb === deriving.pull) 325 - sink(deriving.push(null)); 326 - if (tb === deriving.close) 341 + if (signal === TalkbackKind.Pull) { 342 + sink(push(null)); 343 + } else { 327 344 closing++; 345 + } 328 346 })); 329 347 }; 330 348 331 - const sink: types.sinkT<any> = signal => { 332 - if (deriving.isStart(signal)) { 333 - deriving.unboxStart(signal)(deriving.pull); 334 - } if (deriving.isEnd(signal)) { 349 + const sink: Sink<any> = signal => { 350 + if (signal === SignalKind.End) { 335 351 ending++; 352 + } else if (signal.tag === SignalKind.Start) { 353 + signal[0](TalkbackKind.Pull); 336 354 } 337 355 }; 338 356 ··· 341 359 expect(closing).toBe(1); 342 360 expect(ending).toBe(1); 343 361 }); 362 + }; 344 363 345 364 const passesAsyncSequence = ( 346 - operator: types.operatorT<any, any>, 365 + operator: Operator<any, any>, 347 366 result: any = 0 348 - ) => 367 + ) => { 349 368 it('passes an async push with an async end (spec)', () => { 350 369 let hasPushed = false; 351 - const signals = []; 370 + const signals: Signal<any>[] = []; 352 371 353 - const source: types.sourceT<any> = sink => { 354 - sink(deriving.start(tb => { 355 - if (tb === deriving.pull && !hasPushed) { 372 + const source: Source<any> = sink => { 373 + sink(start((signal) => { 374 + if (signal === TalkbackKind.Pull && !hasPushed) { 356 375 hasPushed = true; 357 - setTimeout(() => sink(deriving.push(0)), 10); 358 - setTimeout(() => sink(deriving.end()), 20); 376 + setTimeout(() => sink(push(0)), 10); 377 + setTimeout(() => sink(SignalKind.End), 20); 359 378 } 360 379 })); 361 380 }; 362 381 363 - const sink: types.sinkT<any> = signal => { 364 - if (deriving.isStart(signal)) { 382 + const sink: Sink<any> = signal => { 383 + if (signal === SignalKind.End) { 384 + signals.push(signal); 385 + } else if (signal.tag === SignalKind.Push) { 386 + signals.push(signal); 387 + } else { 365 388 setTimeout(() => { 366 - deriving.unboxStart(signal)(deriving.pull); 389 + signal[0](TalkbackKind.Pull); 367 390 }, 5); 368 - } else { 369 - signals.push(signal); 370 391 } 371 392 }; 372 393 ··· 379 400 jest.runAllTimers(); 380 401 381 402 expect(signals).toEqual([ 382 - deriving.push(result), 383 - deriving.end() 403 + push(result), 404 + SignalKind.End 384 405 ]); 385 406 }); 407 + }; 386 408 387 409 beforeEach(() => { 388 410 jest.useFakeTimers(); 389 411 }); 390 412 391 413 describe('combine', () => { 392 - const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source); 414 + const noop = (source: Source<any>) => operators.combine(sources.fromValue(0), source); 393 415 394 416 passesPassivePull(noop, [0, 0]); 395 417 passesActivePush(noop, [0, 0]); ··· 413 435 }); 414 436 415 437 describe('buffer', () => { 416 - const valueThenNever: types.sourceT<any> = sink => 417 - sink(deriving.start(tb => { 418 - if (tb === deriving.pull) 419 - sink(deriving.push(null)); 438 + const valueThenNever: Source<any> = sink => 439 + sink(start((signal) => { 440 + if (signal === TalkbackKind.Pull) 441 + sink(push(null)); 420 442 })); 421 443 422 444 const noop = operators.buffer(valueThenNever); ··· 471 493 472 494 expect(fn).toHaveBeenCalledTimes(6); 473 495 expect(fn.mock.calls).toEqual([ 474 - [deriving.start(expect.any(Function))], 475 - [deriving.push(1)], 476 - [deriving.push(2)], 477 - [deriving.push(3)], 478 - [deriving.push(4)], 479 - [deriving.end()], 496 + [start(expect.any(Function))], 497 + [push(1)], 498 + [push(2)], 499 + [push(3)], 500 + [push(4)], 501 + [SignalKind.End], 480 502 ]); 481 503 }); 482 504 483 505 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 484 506 it('lets inner sources finish when outer source ends', () => { 485 - const values = []; 507 + const signals: Signal<any>[] = []; 486 508 const teardown = jest.fn(); 487 - const fn = (signal: types.signalT<any>) => { 488 - values.push(signal); 489 - if (deriving.isStart(signal)) { 490 - deriving.unboxStart(signal)(deriving.pull); 491 - deriving.unboxStart(signal)(deriving.close); 509 + const fn = (signal: Signal<any>) => { 510 + signals.push(signal); 511 + if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 512 + signal[0](TalkbackKind.Pull); 513 + signal[0](TalkbackKind.Close); 492 514 } 493 515 }; 494 516 ··· 497 519 })(sources.fromValue(null))(fn); 498 520 499 521 expect(teardown).toHaveBeenCalled(); 500 - expect(values).toEqual([ 501 - deriving.start(expect.any(Function)), 522 + expect(signals).toEqual([ 523 + start(expect.any(Function)), 502 524 ]); 503 525 }); 504 526 505 527 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 506 528 it('emits values from each flattened asynchronous source, one at a time', () => { 507 - const source = web.delay<number>(4)(sources.fromArray([1, 10])); 529 + const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 508 530 const fn = jest.fn(); 509 531 510 532 sinks.forEach(fn)( 511 533 operators.concatMap((x: number) => { 512 - return web.delay(5)(sources.fromArray([x, x * 2])); 534 + return operators.delay(5)(sources.fromArray([x, x * 2])); 513 535 })(source) 514 536 ); 515 537 ··· 545 567 }); 546 568 547 569 it('emits synchronous values in order', () => { 548 - const values = []; 570 + const values: any[] = []; 549 571 550 572 sinks.forEach(x => values.push(x))( 551 573 operators.concat([ ··· 559 581 }); 560 582 561 583 describe('debounce', () => { 562 - const noop = web.debounce(() => 0); 584 + const noop = operators.debounce(() => 0); 563 585 passesPassivePull(noop); 564 586 passesActivePush(noop); 565 587 passesSinkClose(noop); ··· 572 594 const { source, next } = sources.makeSubject<number>(); 573 595 const fn = jest.fn(); 574 596 575 - sinks.forEach(fn)(web.debounce(() => 100)(source)); 597 + sinks.forEach(fn)(operators.debounce(() => 100)(source)); 576 598 577 599 next(1); 578 600 jest.advanceTimersByTime(50); ··· 590 612 const { source, next, complete } = sources.makeSubject<number>(); 591 613 const fn = jest.fn(); 592 614 593 - sinks.forEach(fn)(web.debounce(() => 100)(source)); 615 + sinks.forEach(fn)(operators.debounce(() => 100)(source)); 594 616 595 617 next(1); 596 618 complete(); ··· 600 622 }); 601 623 602 624 describe('delay', () => { 603 - const noop = web.delay(0); 625 + const noop = operators.delay(0); 604 626 passesPassivePull(noop); 605 627 passesActivePush(noop); 606 628 passesSinkClose(noop); ··· 612 634 const { source, next } = sources.makeSubject(); 613 635 const fn = jest.fn(); 614 636 615 - sinks.forEach(fn)(web.delay(100)(source)); 637 + sinks.forEach(fn)(operators.delay(100)(source)); 616 638 617 639 next(1); 618 640 expect(fn).not.toHaveBeenCalled(); ··· 687 709 complete(); 688 710 689 711 expect(fn.mock.calls).toEqual([ 690 - [deriving.start(expect.any(Function))], 691 - [deriving.push(1)], 692 - [deriving.push(2)], 693 - [deriving.push(3)], 694 - [deriving.push(4)], 695 - [deriving.end()], 712 + [start(expect.any(Function))], 713 + [push(1)], 714 + [push(2)], 715 + [push(3)], 716 + [push(4)], 717 + [SignalKind.End], 696 718 ]); 697 719 }); 698 720 699 721 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 700 722 it('lets inner sources finish when outer source ends', () => { 701 - const values = []; 723 + const values: Signal<any>[] = []; 702 724 const teardown = jest.fn(); 703 - const fn = (signal: types.signalT<any>) => { 725 + const fn = (signal: Signal<any>) => { 704 726 values.push(signal); 705 - if (deriving.isStart(signal)) { 706 - deriving.unboxStart(signal)(deriving.pull); 707 - deriving.unboxStart(signal)(deriving.close); 727 + if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 728 + signal[0](TalkbackKind.Pull); 729 + signal[0](TalkbackKind.Close); 708 730 } 709 731 }; 710 732 ··· 714 736 715 737 expect(teardown).toHaveBeenCalled(); 716 738 expect(values).toEqual([ 717 - deriving.start(expect.any(Function)), 739 + start(expect.any(Function)), 718 740 ]); 719 741 }); 720 742 721 743 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 722 744 it('emits values from each flattened asynchronous source simultaneously', () => { 723 - const source = web.delay<number>(4)(sources.fromArray([1, 10])); 745 + const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 724 746 const fn = jest.fn(); 725 747 726 748 sinks.forEach(fn)( 727 749 operators.mergeMap((x: number) => { 728 - return web.delay(5)(sources.fromArray([x, x * 2])); 750 + return operators.delay(5)(sources.fromArray([x, x * 2])); 729 751 })(source) 730 752 ); 731 753 ··· 739 761 }); 740 762 741 763 it('emits synchronous values in order', () => { 742 - const values = []; 764 + const values: any[] = []; 743 765 744 766 sinks.forEach(x => values.push(x))( 745 767 operators.merge([ ··· 763 785 passesAsyncSequence(noop); 764 786 765 787 it('calls a callback when the source ends', () => { 766 - const { source, next, complete } = sources.makeSubject<number>(); 788 + const { source, next, complete } = sources.makeSubject<any>(); 767 789 const fn = jest.fn(); 768 790 769 791 sinks.forEach(() => {})(operators.onEnd(fn)(source)); ··· 813 835 passesAsyncSequence(noop); 814 836 815 837 it('is called when the source starts', () => { 816 - let sink: types.sinkT<any>; 838 + let sink: Sink<any>; 817 839 818 840 const fn = jest.fn(); 819 - const source: types.sourceT<any> = _sink => { sink = _sink; }; 841 + const source: Source<any> = _sink => { sink = _sink; }; 820 842 821 843 sinks.forEach(() => {})(operators.onStart(fn)(source)); 822 844 823 845 expect(fn).not.toHaveBeenCalled(); 824 846 825 - sink(deriving.start(() => {})); 847 + sink!(start(() => {})); 826 848 expect(fn).toHaveBeenCalled(); 827 849 }); 828 850 }); 829 851 830 852 describe('sample', () => { 831 - const valueThenNever: types.sourceT<any> = sink => 832 - sink(deriving.start(tb => { 833 - if (tb === deriving.pull) 834 - sink(deriving.push(null)); 853 + const valueThenNever: Source<any> = sink => 854 + sink(start((signal) => { 855 + if (signal === TalkbackKind.Pull) 856 + sink(push(null)); 835 857 })); 836 858 837 859 const noop = operators.sample(valueThenNever); ··· 860 882 }); 861 883 862 884 describe('scan', () => { 863 - const noop = operators.scan((_acc, x) => x, null); 885 + const noop = operators.scan<any, any>((_acc, x) => x, null); 864 886 passesPassivePull(noop); 865 887 passesActivePush(noop); 866 888 passesSinkClose(noop); ··· 893 915 passesAsyncSequence(noop); 894 916 895 917 it('shares output values between sinks', () => { 896 - let push = () => {}; 918 + let onPush = () => {}; 897 919 898 - const source: types.sourceT<any> = operators.share(sink => { 899 - sink(deriving.start(() => {})); 900 - push = () => { 901 - sink(deriving.push([0])); 902 - sink(deriving.end()); 920 + const source: Source<any> = operators.share(sink => { 921 + sink(start(() => {})); 922 + onPush = () => { 923 + sink(push([0])); 924 + sink(SignalKind.End); 903 925 }; 904 926 }); 905 927 ··· 908 930 909 931 sinks.forEach(fnA)(source); 910 932 sinks.forEach(fnB)(source); 911 - push(); 933 + onPush(); 912 934 913 935 expect(fnA).toHaveBeenCalledWith([0]); 914 936 expect(fnB).toHaveBeenCalledWith([0]); ··· 976 998 const { source, next } = sources.makeSubject<number>(); 977 999 const fn = jest.fn(); 978 1000 979 - sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source)); 1001 + sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source)); 980 1002 981 1003 next(1); 982 1004 expect(fn).not.toHaveBeenCalled(); ··· 1008 1030 1009 1031 expect(fn).toHaveBeenCalledTimes(6); 1010 1032 expect(fn.mock.calls).toEqual([ 1011 - [deriving.start(expect.any(Function))], 1012 - [deriving.push(1)], 1013 - [deriving.push(2)], 1014 - [deriving.push(3)], 1015 - [deriving.push(4)], 1016 - [deriving.end()], 1033 + [start(expect.any(Function))], 1034 + [push(1)], 1035 + [push(2)], 1036 + [push(3)], 1037 + [push(4)], 1038 + [SignalKind.End], 1017 1039 ]); 1018 1040 }); 1019 1041 1020 1042 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 1021 1043 it('lets inner sources finish when outer source ends', () => { 1022 - const values = []; 1044 + const signals: Signal<any>[] = []; 1023 1045 const teardown = jest.fn(); 1024 - const fn = (signal: types.signalT<any>) => { 1025 - values.push(signal); 1026 - if (deriving.isStart(signal)) { 1027 - deriving.unboxStart(signal)(deriving.pull); 1028 - deriving.unboxStart(signal)(deriving.close); 1046 + const fn = (signal: Signal<any>) => { 1047 + signals.push(signal); 1048 + if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 1049 + signal[0](TalkbackKind.Pull); 1050 + signal[0](TalkbackKind.Close); 1029 1051 } 1030 1052 }; 1031 1053 ··· 1034 1056 })(sources.fromValue(null))(fn); 1035 1057 1036 1058 expect(teardown).toHaveBeenCalled(); 1037 - expect(values).toEqual([ 1038 - deriving.start(expect.any(Function)), 1059 + expect(signals).toEqual([ 1060 + start(expect.any(Function)), 1039 1061 ]); 1040 1062 }); 1041 1063 1042 1064 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 1043 1065 it('emits values from each flattened asynchronous source, one at a time', () => { 1044 - const source = web.delay<number>(4)(sources.fromArray([1, 10])); 1066 + const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 1045 1067 const fn = jest.fn(); 1046 1068 1047 1069 sinks.forEach(fn)( 1048 1070 operators.switchMap((x: number) => ( 1049 - operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5))) 1071 + operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5))) 1050 1072 ))(source) 1051 1073 ); 1052 1074 ··· 1080 1102 1081 1103 expect(fn).toHaveBeenCalledTimes(3); 1082 1104 expect(fn.mock.calls).toEqual([ 1083 - [deriving.start(expect.any(Function))], 1084 - [deriving.push(1)], 1085 - [deriving.end()], 1105 + [start(expect.any(Function))], 1106 + [push(1)], 1107 + [SignalKind.End], 1086 1108 ]); 1087 1109 }); 1088 1110 }); ··· 1101 1123 passesCloseAndEnd(ending); 1102 1124 1103 1125 it('emits values until a notifier emits', () => { 1104 - const { source: notifier$, next: notify } = sources.makeSubject<number>(); 1126 + const { source: notifier$, next: notify } = sources.makeSubject<any>(); 1105 1127 const { source: input$, next } = sources.makeSubject<number>(); 1106 1128 const fn = jest.fn(); 1107 1129 ··· 1110 1132 1111 1133 expect(fn).toHaveBeenCalledTimes(2); 1112 1134 expect(fn.mock.calls).toEqual([ 1113 - [deriving.start(expect.any(Function))], 1114 - [deriving.push(1)], 1135 + [start(expect.any(Function))], 1136 + [push(1)], 1115 1137 ]); 1116 1138 1117 1139 notify(null); 1118 1140 expect(fn).toHaveBeenCalledTimes(3); 1119 - expect(fn.mock.calls[2][0]).toEqual(deriving.end()); 1141 + expect(fn.mock.calls[2][0]).toEqual(SignalKind.End); 1120 1142 }); 1121 1143 }); 1122 1144 ··· 1136 1158 const { source, next } = sources.makeSubject<number>(); 1137 1159 const fn = jest.fn(); 1138 1160 1139 - operators.takeWhile(x => x < 2)(source)(fn); 1161 + operators.takeWhile((x: any) => x < 2)(source)(fn); 1140 1162 next(1); 1141 1163 next(2); 1142 1164 1143 1165 expect(fn.mock.calls).toEqual([ 1144 - [deriving.start(expect.any(Function))], 1145 - [deriving.push(1)], 1146 - [deriving.end()], 1166 + [start(expect.any(Function))], 1167 + [push(1)], 1168 + [SignalKind.End], 1147 1169 ]); 1148 1170 }); 1149 1171 }); ··· 1153 1175 1154 1176 it('emits the last max values of an ended source', () => { 1155 1177 const { source, next, complete } = sources.makeSubject<number>(); 1156 - const values = []; 1178 + const signals: Signal<any>[] = []; 1157 1179 1158 - let talkback; 1180 + let talkback: TalkbackFn; 1181 + 1159 1182 operators.takeLast(1)(source)(signal => { 1160 - values.push(signal); 1161 - if (deriving.isStart(signal)) 1162 - talkback = deriving.unboxStart(signal); 1163 - if (!deriving.isEnd(signal)) 1164 - talkback(deriving.pull); 1183 + signals.push(signal); 1184 + if (signal === SignalKind.End) { 1185 + /*noop*/ 1186 + } else if (signal.tag === SignalKind.Start) { 1187 + talkback = signal[0]; 1188 + } else { 1189 + talkback!(TalkbackKind.Pull); 1190 + } 1165 1191 }); 1166 1192 1167 1193 next(1); 1168 1194 next(2); 1169 1195 1170 - expect(values.length).toBe(0); 1196 + expect(signals.length).toBe(0); 1171 1197 complete(); 1172 1198 1173 - expect(values).toEqual([ 1174 - deriving.start(expect.any(Function)), 1175 - deriving.push(2), 1176 - deriving.end(), 1199 + expect(signals).toEqual([ 1200 + start(expect.any(Function)), 1201 + push(2), 1202 + SignalKind.End, 1177 1203 ]); 1178 1204 }); 1179 1205 }); 1180 1206 1181 1207 describe('throttle', () => { 1182 - const noop = web.throttle(() => 0); 1208 + const noop = operators.throttle(() => 0); 1183 1209 passesPassivePull(noop); 1184 1210 passesActivePush(noop); 1185 1211 passesSinkClose(noop); ··· 1191 1217 const { source, next } = sources.makeSubject<number>(); 1192 1218 const fn = jest.fn(); 1193 1219 1194 - sinks.forEach(fn)(web.throttle(() => 100)(source)); 1220 + sinks.forEach(fn)(operators.throttle(() => 100)(source)); 1195 1221 1196 1222 next(1); 1197 1223 expect(fn).toHaveBeenCalledWith(1);
-297
src/Wonka_sinks.test.ts
··· 1 - import * as deriving from './helpers/Wonka_deriving'; 2 - import * as sinks from './Wonka_sinks.gen'; 3 - import * as sources from './Wonka_sources.gen'; 4 - import * as web from './web/WonkaJs.gen'; 5 - import * as types from './Wonka_types.gen'; 6 - 7 - import Observable from 'zen-observable'; 8 - import callbagIterate from 'callbag-iterate'; 9 - import callbagTake from 'callbag-take'; 10 - 11 - describe('subscribe', () => { 12 - it('sends Pull talkback signals every Push signal', () => { 13 - let pulls = 0; 14 - const fn = jest.fn(); 15 - 16 - const source: types.sourceT<any> = sink => { 17 - sink(deriving.start(tb => { 18 - if (tb === deriving.pull) { 19 - if (pulls < 3) { 20 - pulls++; 21 - sink(deriving.push(0)); 22 - } else { 23 - sink(deriving.end()); 24 - expect(pulls).toBe(3); 25 - } 26 - } 27 - })); 28 - }; 29 - 30 - sinks.subscribe(fn)(source); 31 - expect(fn).toHaveBeenCalledTimes(3); 32 - expect(pulls).toBe(3); 33 - }); 34 - 35 - it('cancels when unsubscribe is called', () => { 36 - let pulls = 0; 37 - let closing = 0; 38 - 39 - const source: types.sourceT<any> = sink => { 40 - sink(deriving.start(tb => { 41 - if (tb === deriving.pull) { 42 - if (!pulls) { 43 - pulls++; 44 - sink(deriving.push(0)); 45 - } 46 - } else if (tb === deriving.close) { 47 - closing++; 48 - } 49 - })); 50 - }; 51 - 52 - const sub = sinks.subscribe(() => {})(source); 53 - expect(pulls).toBe(1); 54 - 55 - sub.unsubscribe(); 56 - expect(closing).toBe(1); 57 - }); 58 - 59 - it('ignores cancellation when the source has already ended', () => { 60 - let pulls = 0; 61 - let closing = 0; 62 - 63 - const source: types.sourceT<any> = sink => { 64 - sink(deriving.start(tb => { 65 - if (tb === deriving.pull) { 66 - pulls++; 67 - sink(deriving.end()); 68 - } else if (tb === deriving.close) { 69 - closing++; 70 - } 71 - })); 72 - }; 73 - 74 - const sub = sinks.subscribe(() => {})(source); 75 - expect(pulls).toBe(1); 76 - sub.unsubscribe(); 77 - expect(closing).toBe(0); 78 - }); 79 - 80 - it('ignores Push signals after the source has ended', () => { 81 - const fn = jest.fn(); 82 - const source: types.sourceT<any> = sink => { 83 - sink(deriving.start(tb => { 84 - if (tb === deriving.pull) { 85 - sink(deriving.end()); 86 - sink(deriving.push(0)); 87 - } 88 - })); 89 - }; 90 - 91 - sinks.subscribe(fn)(source); 92 - expect(fn).not.toHaveBeenCalled(); 93 - }); 94 - 95 - it('ignores Push signals after cancellation', () => { 96 - const fn = jest.fn(); 97 - const source: types.sourceT<any> = sink => { 98 - sink(deriving.start(tb => { 99 - if (tb === deriving.close) { 100 - sink(deriving.push(0)); 101 - } 102 - })); 103 - }; 104 - 105 - sinks.subscribe(fn)(source).unsubscribe(); 106 - expect(fn).not.toHaveBeenCalled(); 107 - }); 108 - }); 109 - 110 - describe('publish', () => { 111 - it('sends Pull talkback signals every Push signal', () => { 112 - let pulls = 0; 113 - const source: types.sourceT<any> = sink => { 114 - sink(deriving.start(tb => { 115 - if (tb === deriving.pull) { 116 - if (pulls < 3) { 117 - pulls++; 118 - sink(deriving.push(0)); 119 - } else { 120 - sink(deriving.end()); 121 - expect(pulls).toBe(3); 122 - } 123 - } 124 - })); 125 - }; 126 - 127 - sinks.publish(source); 128 - expect(pulls).toBe(3); 129 - }); 130 - }); 131 - 132 - describe('toArray', () => { 133 - it('sends Pull talkback signals every Push signal', () => { 134 - let pulls = 0; 135 - const source: types.sourceT<any> = sink => { 136 - sink(deriving.start(tb => { 137 - if (tb === deriving.pull) { 138 - if (pulls < 3) { 139 - pulls++; 140 - sink(deriving.push(0)); 141 - } else { 142 - sink(deriving.end()); 143 - expect(pulls).toBe(3); 144 - } 145 - } 146 - })); 147 - }; 148 - 149 - const array = sinks.toArray(source); 150 - expect(array).toEqual([0, 0, 0]); 151 - expect(pulls).toBe(3); 152 - }); 153 - 154 - it('sends a Close talkback signal after all synchronous values have been pulled', () => { 155 - let pulls = 0; 156 - let ending = 0; 157 - 158 - const source: types.sourceT<any> = sink => { 159 - sink(deriving.start(tb => { 160 - if (tb === deriving.pull) { 161 - if (!pulls) { 162 - pulls++; 163 - sink(deriving.push(0)); 164 - } 165 - } else if (tb === deriving.close) { 166 - ending++; 167 - } 168 - })); 169 - }; 170 - 171 - const array = sinks.toArray(source); 172 - expect(array).toEqual([0]); 173 - expect(ending).toBe(1); 174 - }); 175 - }); 176 - 177 - describe('toPromise', () => { 178 - it('creates a Promise that resolves on the last value', async () => { 179 - let pulls = 0; 180 - let sink = null; 181 - 182 - const source: types.sourceT<any> = _sink => { 183 - sink = _sink; 184 - sink(deriving.start(tb => { 185 - if (tb === deriving.pull) 186 - pulls++; 187 - })); 188 - }; 189 - 190 - const fn = jest.fn(); 191 - const promise = web.toPromise(source).then(fn); 192 - 193 - expect(pulls).toBe(1); 194 - sink(deriving.push(0)); 195 - expect(pulls).toBe(2); 196 - sink(deriving.push(1)); 197 - sink(deriving.end()); 198 - expect(fn).not.toHaveBeenCalled(); 199 - 200 - await promise; 201 - expect(fn).toHaveBeenCalledWith(1); 202 - }); 203 - 204 - it('creates a Promise for synchronous sources', async () => { 205 - const fn = jest.fn(); 206 - await web.toPromise(sources.fromArray([1, 2, 3])).then(fn); 207 - expect(fn).toHaveBeenCalledWith(3); 208 - }); 209 - }); 210 - 211 - describe('toObservable', () => { 212 - it('creates an Observable mirroring the Wonka source', () => { 213 - const next = jest.fn(); 214 - const complete = jest.fn(); 215 - let pulls = 0; 216 - let sink = null; 217 - 218 - const source: types.sourceT<any> = _sink => { 219 - sink = _sink; 220 - sink(deriving.start(tb => { 221 - if (tb === deriving.pull) 222 - pulls++; 223 - })); 224 - }; 225 - 226 - Observable.from(web.toObservable(source) as any).subscribe({ 227 - next, 228 - complete, 229 - }); 230 - 231 - expect(pulls).toBe(1); 232 - sink(deriving.push(0)); 233 - expect(next).toHaveBeenCalledWith(0); 234 - sink(deriving.push(1)); 235 - expect(next).toHaveBeenCalledWith(1); 236 - sink(deriving.end()); 237 - expect(complete).toHaveBeenCalled(); 238 - }); 239 - 240 - it('forwards cancellations from the Observable as a talkback', () => { 241 - let ending = 0; 242 - const source: types.sourceT<any> = sink => 243 - sink(deriving.start(tb => { 244 - if (tb === deriving.close) 245 - ending++; 246 - })); 247 - 248 - const sub = Observable.from(web.toObservable(source) as any).subscribe({}); 249 - 250 - expect(ending).toBe(0); 251 - sub.unsubscribe(); 252 - expect(ending).toBe(1); 253 - }); 254 - }); 255 - 256 - describe('toCallbag', () => { 257 - it('creates a Callbag mirroring the Wonka source', () => { 258 - const fn = jest.fn(); 259 - let pulls = 0; 260 - let sink = null; 261 - 262 - const source: types.sourceT<any> = _sink => { 263 - sink = _sink; 264 - sink(deriving.start(tb => { 265 - if (tb === deriving.pull) 266 - pulls++; 267 - })); 268 - }; 269 - 270 - callbagIterate(fn)(web.toCallbag(source)); 271 - 272 - expect(pulls).toBe(1); 273 - sink(deriving.push(0)); 274 - expect(fn).toHaveBeenCalledWith(0); 275 - sink(deriving.push(1)); 276 - expect(fn).toHaveBeenCalledWith(1); 277 - sink(deriving.end()); 278 - }); 279 - 280 - it('forwards cancellations from the Callbag as a talkback', () => { 281 - let ending = 0; 282 - const fn = jest.fn(); 283 - 284 - const source: types.sourceT<any> = sink => 285 - sink(deriving.start(tb => { 286 - if (tb === deriving.pull) 287 - sink(deriving.push(0)); 288 - if (tb === deriving.close) 289 - ending++; 290 - })); 291 - 292 - callbagIterate(fn)(callbagTake(1)(web.toCallbag(source) as any)); 293 - 294 - expect(fn.mock.calls).toEqual([[0]]); 295 - expect(ending).toBe(1); 296 - }); 297 - });
+126 -128
src/Wonka_sources.test.ts src/sources.test.ts
··· 1 - import * as deriving from './helpers/Wonka_deriving'; 2 - import * as sources from './Wonka_sources.gen'; 3 - import * as operators from './Wonka_operators.gen'; 4 - import * as types from './Wonka_types.gen'; 5 - import * as web from './web/WonkaJs.gen'; 1 + import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2 + import { push, start, talkbackPlaceholder } from './helpers'; 3 + 4 + import * as sources from './sources'; 5 + import * as operators from './operators'; 6 + import * as callbag from './callbag'; 7 + import * as observable from './observable'; 6 8 7 9 import callbagFromArray from 'callbag-from-iter'; 8 10 import Observable from 'zen-observable'; 9 11 10 12 const collectSignals = ( 11 - source: types.sourceT<any>, 12 - onStart?: (talkbackCb: (tb: types.talkbackT) => void) => void 13 + source: Source<any>, 14 + onStart?: (talkbackCb: TalkbackFn) => void 13 15 ) => { 14 - let talkback = null; 15 - const signals = []; 16 - 16 + let talkback = talkbackPlaceholder; 17 + const signals: Signal<any>[] = []; 17 18 source(signal => { 18 19 signals.push(signal); 19 - if (deriving.isStart(signal)) { 20 - talkback = deriving.unboxStart(signal); 20 + if (signal === SignalKind.End) { 21 + /*noop*/ 22 + } else if (signal.tag === SignalKind.Start) { 23 + talkback = signal[0]; 21 24 if (onStart) onStart(talkback); 22 - talkback(deriving.pull); 23 - } else if (deriving.isPush(signal)) { 24 - talkback(deriving.pull); 25 + talkback(TalkbackKind.Pull); 26 + } else { 27 + talkback(TalkbackKind.Pull); 25 28 } 26 29 }) 27 30 ··· 29 32 }; 30 33 31 34 /* When a Close talkback signal is sent the source should immediately end */ 32 - const passesActiveClose = (source: types.sourceT<any>) => 35 + const passesActiveClose = (source: Source<any>) => { 33 36 it('stops emitting when a Close talkback signal is received (spec)', () => { 34 - let talkback = null; 35 - 36 - const sink: types.sinkT<any> = signal => { 37 - expect(deriving.isPush(signal)).toBeFalsy(); 38 - expect(deriving.isEnd(signal)).toBeFalsy(); 39 - if (deriving.isStart(signal)) { 40 - talkback = deriving.unboxStart(signal); 41 - talkback(deriving.close); 37 + let talkback: TalkbackFn | null = null; 38 + const sink: Sink<any> = signal => { 39 + expect(signal).not.toBe(SignalKind.End); 40 + expect((signal as any).tag).not.toBe(SignalKind.Push); 41 + if ((signal as any).tag === SignalKind.Start) { 42 + (talkback = signal[0])(TalkbackKind.Close); 42 43 } 43 44 }; 44 - 45 45 source(sink); 46 46 expect(talkback).not.toBe(null); 47 47 }); 48 + }; 48 49 49 50 /* All synchronous, cold sources won't send anything unless a Pull signal 50 51 has been received. */ 51 - const passesColdPull = (source: types.sourceT<any>) => 52 + const passesColdPull = (source: Source<any>) => { 52 53 it('sends nothing when no Pull talkback signal has been sent (spec)', () => { 54 + let talkback: TalkbackFn | null = null; 53 55 let pushes = 0; 54 - let talkback = null; 55 56 56 - const sink: types.sinkT<any> = signal => { 57 - if (deriving.isPush(signal)) { 57 + const sink: Sink<any> = signal => { 58 + if (signal === SignalKind.End) { 59 + /*noop*/ 60 + } else if (signal.tag === SignalKind.Push) { 58 61 pushes++; 59 - } else if (deriving.isStart(signal)) { 60 - talkback = deriving.unboxStart(signal); 62 + } else { 63 + talkback = signal[0]; 61 64 } 62 65 }; 63 66 ··· 67 70 68 71 setTimeout(() => { 69 72 expect(pushes).toBe(0); 70 - talkback(deriving.pull); 73 + talkback!(TalkbackKind.Pull); 71 74 }, 10); 72 75 73 76 jest.runAllTimers(); 74 77 expect(pushes).toBe(1); 75 78 }); 79 + }; 76 80 77 81 /* All synchronous, cold sources need to use trampoline scheduling to avoid 78 82 recursively sending more and more Push signals which would eventually lead 79 83 to a call stack overflow when too many values are emitted. */ 80 - const passesTrampoline = (source: types.sourceT<any>) => 84 + const passesTrampoline = (source: Source<any>) => { 81 85 it('uses trampoline scheduling instead of recursive push signals (spec)', () => { 82 - let talkback = null; 86 + let talkback: TalkbackFn | null = null; 83 87 let pushes = 0; 84 88 85 - const signals = []; 86 - const sink: types.sinkT<any> = signal => { 87 - if (deriving.isPush(signal)) { 88 - const lastPushes = ++pushes; 89 + const signals: Signal<any>[] = []; 90 + const sink: Sink<any> = signal => { 91 + if (signal === SignalKind.End) { 89 92 signals.push(signal); 90 - talkback(deriving.pull); 91 - expect(lastPushes).toBe(pushes); 92 - } else if (deriving.isStart(signal)) { 93 - talkback = deriving.unboxStart(signal); 94 - talkback(deriving.pull); 95 93 expect(pushes).toBe(2); 96 - } else if (deriving.isEnd(signal)) { 94 + } else if (signal.tag === SignalKind.Push) { 95 + const lastPushes = ++pushes; 97 96 signals.push(signal); 97 + talkback!(TalkbackKind.Pull); 98 + expect(lastPushes).toBe(pushes); 99 + } else if (signal.tag === SignalKind.Start) { 100 + (talkback = signal[0])(TalkbackKind.Pull); 98 101 expect(pushes).toBe(2); 99 102 } 100 103 }; 101 104 102 105 source(sink); 103 - 104 106 expect(signals).toEqual([ 105 - deriving.push(1), 106 - deriving.push(2), 107 - deriving.end(), 107 + push(1), 108 + push(2), 109 + SignalKind.End, 108 110 ]); 109 111 }); 112 + }; 110 113 111 114 beforeEach(() => { 112 115 jest.useFakeTimers(); ··· 118 121 passesActiveClose(sources.fromArray([0])); 119 122 }); 120 123 121 - describe('fromList', () => { 122 - passesTrampoline(sources.fromList({hd: 1, tl: { hd: 2, tl: 0 } } as any)); 123 - passesColdPull(sources.fromList({ hd: 0, tl: 0 } as any)); 124 - passesActiveClose(sources.fromList({ hd: 1, tl: 0 } as any)); 125 - }); 126 - 127 124 describe('fromValue', () => { 128 125 passesColdPull(sources.fromValue(0)); 129 126 passesActiveClose(sources.fromValue(0)); 130 127 131 128 it('sends a single value and ends', () => { 132 129 expect(collectSignals(sources.fromValue(1))).toEqual([ 133 - deriving.start(expect.any(Function)), 134 - deriving.push(1), 135 - deriving.end() 130 + start(expect.any(Function)), 131 + push(1), 132 + SignalKind.End, 136 133 ]); 137 134 }); 138 135 }); ··· 153 150 ]); 154 151 155 152 expect(collectSignals(source)).toEqual([ 156 - deriving.start(expect.any(Function)), 157 - deriving.push(0), 158 - deriving.end(), 153 + start(expect.any(Function)), 154 + push(0), 155 + end(), 159 156 ]); 160 157 }); 161 158 ··· 170 167 expect(onStart).toHaveBeenCalledTimes(2); 171 168 172 169 expect(signals).toEqual([ 173 - deriving.start(expect.any(Function)), 174 - deriving.push(1), 175 - deriving.push(2), 170 + start(expect.any(Function)), 171 + push(1), 172 + push(2), 176 173 ]); 177 174 }); 178 175 ··· 183 180 const source = operators.merge<any>([ 184 181 operators.onStart(onStart)(sources.fromValue(-1)), 185 182 operators.onStart(onStart)( 186 - operators.take(2)(web.interval(50)) 183 + operators.take(2)(sources.interval(50)) 187 184 ), 188 185 ]); 189 186 ··· 192 189 expect(onStart).toHaveBeenCalledTimes(2); 193 190 194 191 expect(signals).toEqual([ 195 - deriving.start(expect.any(Function)), 196 - deriving.push(-1), 197 - deriving.push(0), 198 - deriving.push(1), 199 - deriving.end(), 192 + start(expect.any(Function)), 193 + push(-1), 194 + push(0), 195 + push(1), 196 + SignalKind.End, 200 197 ]); 201 198 }); 202 199 }); ··· 217 214 ]); 218 215 219 216 expect(collectSignals(source)).toEqual([ 220 - deriving.start(expect.any(Function)), 221 - deriving.push(0), 222 - deriving.end(), 217 + start(expect.any(Function)), 218 + push(0), 219 + SignalKind.End, 223 220 ]); 224 221 }); 225 222 }); ··· 234 231 }); 235 232 236 233 const signals = collectSignals(source); 237 - expect(signals).toEqual([deriving.start(expect.any(Function))]); 234 + expect(signals).toEqual([start(expect.any(Function))]); 238 235 jest.runAllTimers(); 239 236 240 237 expect(signals).toEqual([ 241 - deriving.start(expect.any(Function)), 242 - deriving.push(1), 243 - deriving.end(), 238 + start(expect.any(Function)), 239 + push(1), 240 + SignalKind.End, 244 241 ]); 245 242 }); 246 243 ··· 248 245 const teardown = jest.fn(); 249 246 const source = sources.make(() => teardown); 250 247 251 - const sink: types.sinkT<any> = signal => { 252 - expect(deriving.isPush(signal)).toBeFalsy(); 253 - expect(deriving.isEnd(signal)).toBeFalsy(); 254 - if (deriving.isStart(signal)) 255 - setTimeout(() => deriving.unboxStart(signal)(deriving.close)); 248 + const sink: Sink<any> = signal => { 249 + expect(signal).not.toBe(SignalKind.End); 250 + expect((signal as any).tag).not.toBe(SignalKind.Push); 251 + setTimeout(() => signal[0](TalkbackKind.Close)); 256 252 }; 257 253 258 254 source(sink); ··· 268 264 const signals = collectSignals(source); 269 265 270 266 expect(signals).toEqual([ 271 - deriving.start(expect.any(Function)), 267 + start(expect.any(Function)), 272 268 ]); 273 269 274 270 next(1); 275 271 276 272 expect(signals).toEqual([ 277 - deriving.start(expect.any(Function)), 278 - deriving.push(1), 273 + start(expect.any(Function)), 274 + push(1), 279 275 ]); 280 276 281 277 complete(); 282 278 283 279 expect(signals).toEqual([ 284 - deriving.start(expect.any(Function)), 285 - deriving.push(1), 286 - deriving.end(), 280 + start(expect.any(Function)), 281 + push(1), 282 + SignalKind.End, 287 283 ]); 288 284 }); 289 285 ··· 293 289 complete(); 294 290 295 291 expect(signals).toEqual([ 296 - deriving.start(expect.any(Function)), 297 - deriving.end(), 292 + start(expect.any(Function)), 293 + SignalKind.End, 298 294 ]); 299 295 300 296 next(1); ··· 306 302 describe('never', () => { 307 303 it('emits nothing and ends immediately', () => { 308 304 const signals = collectSignals(sources.never); 309 - expect(signals).toEqual([deriving.start(expect.any(Function)) ]); 305 + expect(signals).toEqual([start(expect.any(Function)) ]); 310 306 }); 311 307 }); 312 308 ··· 315 311 const signals = collectSignals(sources.empty); 316 312 317 313 expect(signals).toEqual([ 318 - deriving.start(expect.any(Function)), 319 - deriving.end(), 314 + start(expect.any(Function)), 315 + SignalKind.End, 320 316 ]); 321 317 }); 322 318 }); 323 319 324 320 describe('fromPromise', () => { 325 - passesActiveClose(web.fromPromise(Promise.resolve(null))); 321 + passesActiveClose(sources.fromPromise(Promise.resolve(null))); 326 322 327 323 it('emits a value when the promise resolves', async () => { 328 324 const promise = Promise.resolve(1); 329 - const signals = collectSignals(web.fromPromise(promise)); 325 + const signals = collectSignals(sources.fromPromise(promise)); 330 326 331 327 expect(signals).toEqual([ 332 - deriving.start(expect.any(Function)), 328 + start(expect.any(Function)), 333 329 ]); 334 330 335 331 await promise; 336 332 337 333 expect(signals).toEqual([ 338 - deriving.start(expect.any(Function)), 339 - deriving.push(1), 340 - deriving.end(), 334 + start(expect.any(Function)), 335 + push(1), 336 + SignalKind.End, 341 337 ]); 342 338 }); 343 339 }); ··· 348 344 }); 349 345 350 346 it('converts an Observable to a Wonka source', async () => { 351 - const source = web.fromObservable(Observable.from([1, 2])); 347 + const source = observable.fromObservable(Observable.from([1, 2])); 352 348 const signals = collectSignals(source); 353 349 354 350 await new Promise(resolve => setTimeout(resolve)); 355 351 356 352 expect(signals).toEqual([ 357 - deriving.start(expect.any(Function)), 358 - deriving.push(1), 359 - deriving.push(2), 360 - deriving.end(), 353 + start(expect.any(Function)), 354 + push(1), 355 + push(2), 356 + SignalKind.End, 361 357 ]); 362 358 }); 363 359 364 360 it('supports cancellation on converted Observables', async () => { 365 - const source = web.fromObservable(Observable.from([1, 2])); 361 + const source = observable.fromObservable(Observable.from([1, 2])); 366 362 const signals = collectSignals(source, talkback => { 367 - talkback(deriving.close); 363 + talkback(TalkbackKind.Close); 368 364 }); 369 365 370 366 await new Promise(resolve => setTimeout(resolve)); 371 367 372 368 expect(signals).toEqual([ 373 - deriving.start(expect.any(Function)), 369 + start(expect.any(Function)), 374 370 ]); 375 371 }); 376 372 }); 377 373 378 374 describe('fromCallbag', () => { 379 375 it('converts a Callbag to a Wonka source', () => { 380 - const source = web.fromCallbag(callbagFromArray([1, 2])); 376 + const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any); 381 377 const signals = collectSignals(source); 382 378 383 379 expect(signals).toEqual([ 384 - deriving.start(expect.any(Function)), 385 - deriving.push(1), 386 - deriving.push(2), 387 - deriving.end(), 380 + start(expect.any(Function)), 381 + push(1), 382 + push(2), 383 + SignalKind.End, 388 384 ]); 389 385 }); 390 386 391 387 it('supports cancellation on converted Observables', () => { 392 - const source = web.fromCallbag(callbagFromArray([1, 2])); 388 + const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any); 393 389 const signals = collectSignals(source, talkback => { 394 - talkback(deriving.close); 390 + talkback(TalkbackKind.Close); 395 391 }); 396 392 397 393 expect(signals).toEqual([ 398 - deriving.start(expect.any(Function)), 394 + start(expect.any(Function)), 399 395 ]); 400 396 }); 401 397 }); ··· 403 399 describe('interval', () => { 404 400 it('emits Push signals until Cancel is sent', () => { 405 401 let pushes = 0; 406 - let talkback = null; 402 + let talkback: TalkbackFn | null = null; 407 403 408 - const sink: types.sinkT<any> = signal => { 409 - if (deriving.isPush(signal)) { 404 + const sink: Sink<any> = signal => { 405 + if (signal === SignalKind.End) { 406 + /*noop*/ 407 + } else if (signal.tag === SignalKind.Push) { 410 408 pushes++; 411 - } else if (deriving.isStart(signal)) { 412 - talkback = deriving.unboxStart(signal); 409 + } else { 410 + talkback = signal[0]; 413 411 } 414 412 }; 415 413 416 - web.interval(100)(sink); 414 + sources.interval(100)(sink); 417 415 expect(talkback).not.toBe(null); 418 416 expect(pushes).toBe(0); 419 417 ··· 422 420 jest.advanceTimersByTime(100); 423 421 expect(pushes).toBe(2); 424 422 425 - talkback(deriving.close); 423 + talkback!(TalkbackKind.Close); 426 424 jest.advanceTimersByTime(100); 427 425 expect(pushes).toBe(2); 428 426 }); ··· 430 428 431 429 describe('fromDomEvent', () => { 432 430 it('emits Push signals for events on a DOM element', () => { 433 - let talkback = null; 431 + let talkback: TalkbackFn | null = null; 434 432 435 433 const element = { 436 434 addEventListener: jest.fn(), 437 435 removeEventListener: jest.fn(), 438 436 }; 439 437 440 - const sink: types.sinkT<any> = signal => { 441 - expect(deriving.isEnd(signal)).toBeFalsy(); 442 - if (deriving.isStart(signal)) 443 - talkback = deriving.unboxStart(signal); 438 + const sink: Sink<any> = signal => { 439 + expect(signal).not.toBe(SignalKind.End); 440 + if ((signal as any).tag === SignalKind.Start) 441 + talkback = signal[0]; 444 442 }; 445 443 446 - web.fromDomEvent(element as any, 'click')(sink); 444 + sources.fromDomEvent(element as any, 'click')(sink); 447 445 448 446 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function)); 449 447 expect(element.removeEventListener).not.toHaveBeenCalled(); ··· 451 449 452 450 listener(1); 453 451 listener(2); 454 - talkback(deriving.close); 452 + talkback!(TalkbackKind.Close); 455 453 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener); 456 454 }); 457 455 });
+2 -2
src/callbag.ts
··· 7 7 (t: 2, d?: any): void; 8 8 } 9 9 10 - export function fromCallbag<T>(callbag: Callbag<void, T>): Source<T> { 10 + export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> { 11 11 return (sink) => { 12 12 callbag(0, (signal: number, data: any) => { 13 13 if (signal === 0) { ··· 27 27 }; 28 28 } 29 29 30 - export function toCallbag<T>(source: Source<T>): Callbag<void, T> { 30 + export function toCallbag<T>(source: Source<T>): Callbag<any, T> { 31 31 return (signal: number, sink: any) => { 32 32 if (signal === 0) { 33 33 source((signal) => {
+300
src/sinks.test.ts
··· 1 + import { Source, Sink, SignalKind, TalkbackKind } from './types'; 2 + import { push, start } from './helpers'; 3 + 4 + import * as sinks from './sinks'; 5 + import * as sources from './sources'; 6 + import * as callbag from './callbag'; 7 + import * as observable from './observable'; 8 + 9 + import Observable from 'zen-observable'; 10 + import callbagIterate from 'callbag-iterate'; 11 + import callbagTake from 'callbag-take'; 12 + 13 + describe('subscribe', () => { 14 + it('sends Pull talkback signals every Push signal', () => { 15 + let pulls = 0; 16 + const fn = jest.fn(); 17 + 18 + const source: Source<any> = (sink) => { 19 + sink(start((signal) => { 20 + if (signal === TalkbackKind.Pull) { 21 + if (pulls < 3) { 22 + pulls++; 23 + sink(push(0)); 24 + } else { 25 + sink(SignalKind.End); 26 + expect(pulls).toBe(3); 27 + } 28 + } 29 + })); 30 + }; 31 + 32 + sinks.subscribe(fn)(source); 33 + expect(fn).toHaveBeenCalledTimes(3); 34 + expect(pulls).toBe(3); 35 + }); 36 + 37 + it('cancels when unsubscribe is called', () => { 38 + let pulls = 0; 39 + let closing = 0; 40 + 41 + const source: Source<any> = (sink) => { 42 + sink(start((signal) => { 43 + if (signal === TalkbackKind.Pull) { 44 + if (!pulls) { 45 + pulls++; 46 + sink(push(0)); 47 + } 48 + } else { 49 + closing++; 50 + } 51 + })); 52 + }; 53 + 54 + const sub = sinks.subscribe(() => {})(source); 55 + expect(pulls).toBe(1); 56 + 57 + sub.unsubscribe(); 58 + expect(closing).toBe(1); 59 + }); 60 + 61 + it('ignores cancellation when the source has already ended', () => { 62 + let pulls = 0; 63 + let closing = 0; 64 + 65 + const source: Source<any> = (sink) => { 66 + sink(start((signal) => { 67 + if (signal === TalkbackKind.Pull) { 68 + pulls++; 69 + sink(SignalKind.End); 70 + } else { 71 + closing++; 72 + } 73 + })); 74 + }; 75 + 76 + const sub = sinks.subscribe(() => {})(source); 77 + expect(pulls).toBe(1); 78 + sub.unsubscribe(); 79 + expect(closing).toBe(0); 80 + }); 81 + 82 + it('ignores Push signals after the source has ended', () => { 83 + const fn = jest.fn(); 84 + const source: Source<any> = (sink) => { 85 + sink(start((signal) => { 86 + if (signal === TalkbackKind.Pull) { 87 + sink(SignalKind.End); 88 + sink(push(0)); 89 + } 90 + })); 91 + }; 92 + 93 + sinks.subscribe(fn)(source); 94 + expect(fn).not.toHaveBeenCalled(); 95 + }); 96 + 97 + it('ignores Push signals after cancellation', () => { 98 + const fn = jest.fn(); 99 + const source: Source<any> = (sink) => { 100 + sink(start((signal) => { 101 + if (signal === TalkbackKind.Close) { 102 + sink(push(0)); 103 + } 104 + })); 105 + }; 106 + 107 + sinks.subscribe(fn)(source).unsubscribe(); 108 + expect(fn).not.toHaveBeenCalled(); 109 + }); 110 + }); 111 + 112 + describe('publish', () => { 113 + it('sends Pull talkback signals every Push signal', () => { 114 + let pulls = 0; 115 + const source: Source<any> = (sink) => { 116 + sink(start((signal) => { 117 + if (signal === TalkbackKind.Pull) { 118 + if (pulls < 3) { 119 + pulls++; 120 + sink(push(0)); 121 + } else { 122 + sink(SignalKind.End); 123 + expect(pulls).toBe(3); 124 + } 125 + } 126 + })); 127 + }; 128 + 129 + sinks.publish(source); 130 + expect(pulls).toBe(3); 131 + }); 132 + }); 133 + 134 + describe('toArray', () => { 135 + it('sends Pull talkback signals every Push signal', () => { 136 + let pulls = 0; 137 + const source: Source<any> = (sink) => { 138 + sink(start((signal) => { 139 + if (signal === TalkbackKind.Pull) { 140 + if (pulls < 3) { 141 + pulls++; 142 + sink(push(0)); 143 + } else { 144 + sink(SignalKind.End); 145 + expect(pulls).toBe(3); 146 + } 147 + } 148 + })); 149 + }; 150 + 151 + const array = sinks.toArray(source); 152 + expect(array).toEqual([0, 0, 0]); 153 + expect(pulls).toBe(3); 154 + }); 155 + 156 + it('sends a Close talkback signal after all synchronous values have been pulled', () => { 157 + let pulls = 0; 158 + let ending = 0; 159 + 160 + const source: Source<any> = (sink) => { 161 + sink(start((signal) => { 162 + if (signal === TalkbackKind.Pull) { 163 + if (!pulls) { 164 + pulls++; 165 + sink(push(0)); 166 + } 167 + } else { 168 + ending++; 169 + } 170 + })); 171 + }; 172 + 173 + const array = sinks.toArray(source); 174 + expect(array).toEqual([0]); 175 + expect(ending).toBe(1); 176 + }); 177 + }); 178 + 179 + describe('toPromise', () => { 180 + it('creates a Promise that resolves on the last value', async () => { 181 + let pulls = 0; 182 + let sink: Sink<any> | null = null; 183 + 184 + const source: Source<any> = (_sink) => { 185 + sink = _sink; 186 + sink(start((signal) => { 187 + if (signal === TalkbackKind.Pull) 188 + pulls++; 189 + })); 190 + }; 191 + 192 + const fn = jest.fn(); 193 + const promise = sinks.toPromise(source).then(fn); 194 + 195 + expect(pulls).toBe(1); 196 + sink!(push(0)); 197 + expect(pulls).toBe(2); 198 + sink!(push(1)); 199 + sink!(SignalKind.End); 200 + expect(fn).not.toHaveBeenCalled(); 201 + 202 + await promise; 203 + expect(fn).toHaveBeenCalledWith(1); 204 + }); 205 + 206 + it('creates a Promise for synchronous sources', async () => { 207 + const fn = jest.fn(); 208 + await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn); 209 + expect(fn).toHaveBeenCalledWith(3); 210 + }); 211 + }); 212 + 213 + describe('toObservable', () => { 214 + it('creates an Observable mirroring the Wonka source', () => { 215 + const next = jest.fn(); 216 + const complete = jest.fn(); 217 + let pulls = 0; 218 + let sink: Sink<any> | null = null; 219 + 220 + const source: Source<any> = _sink => { 221 + sink = _sink; 222 + sink(start((signal) => { 223 + if (signal === TalkbackKind.Pull) 224 + pulls++; 225 + })); 226 + }; 227 + 228 + Observable.from(observable.toObservable(source) as any).subscribe({ 229 + next, 230 + complete, 231 + }); 232 + 233 + expect(pulls).toBe(1); 234 + sink!(push(0)); 235 + expect(next).toHaveBeenCalledWith(0); 236 + sink!(push(1)); 237 + expect(next).toHaveBeenCalledWith(1); 238 + sink!(SignalKind.End); 239 + expect(complete).toHaveBeenCalled(); 240 + }); 241 + 242 + it('forwards cancellations from the Observable as a talkback', () => { 243 + let ending = 0; 244 + const source: Source<T> = sink => 245 + sink(start((signal) => { 246 + if (signal === TalkbackKind.Close) 247 + ending++; 248 + })); 249 + 250 + const sub = Observable.from(observable.toObservable(source) as any).subscribe({}); 251 + 252 + expect(ending).toBe(0); 253 + sub.unsubscribe(); 254 + expect(ending).toBe(1); 255 + }); 256 + }); 257 + 258 + describe('toCallbag', () => { 259 + it('creates a Callbag mirroring the Wonka source', () => { 260 + const fn = jest.fn(); 261 + let pulls = 0; 262 + let sink: Sink<any> | null = null; 263 + 264 + const source: Source<any> = _sink => { 265 + sink = _sink; 266 + sink(start((signal) => { 267 + if (signal === TalkbackKind.Pull) 268 + pulls++; 269 + })); 270 + }; 271 + 272 + callbagIterate(fn)(callbag.toCallbag(source)); 273 + 274 + expect(pulls).toBe(1); 275 + sink!(push(0)); 276 + expect(fn).toHaveBeenCalledWith(0); 277 + sink!(push(1)); 278 + expect(fn).toHaveBeenCalledWith(1); 279 + sink!(SignalKind.End); 280 + }); 281 + 282 + it('forwards cancellations from the Callbag as a talkback', () => { 283 + let ending = 0; 284 + const fn = jest.fn(); 285 + 286 + const source: Source<any> = sink => 287 + sink(start((signal) => { 288 + if (signal === TalkbackKind.Pull) { 289 + sink(push(0)); 290 + } else { 291 + ending++; 292 + } 293 + })); 294 + 295 + callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any)); 296 + 297 + expect(fn.mock.calls).toEqual([[0]]); 298 + expect(ending).toBe(1); 299 + }); 300 + });
+2 -4
src/sinks.ts
··· 32 32 }; 33 33 } 34 34 35 - export function publish<T>() { 36 - return (source: Source<T>): void => { 37 - subscribe((_value) => {/*noop*/})(source); 38 - }; 35 + export function publish<T>(source: Source<T>): void { 36 + subscribe((_value) => {/*noop*/})(source); 39 37 } 40 38 41 39 export function toArray<T>(source: Source<T>): T[] {
+34
yarn.lock
··· 652 652 dependencies: 653 653 "@types/yargs-parser" "*" 654 654 655 + "@types/zen-observable@^0.8.3": 656 + version "0.8.3" 657 + resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.3.tgz#781d360c282436494b32fe7d9f7f8e64b3118aa3" 658 + integrity sha512-fbF6oTd4sGGy0xjHPKAt+eS2CrxJ3+6gQ3FGcBoIJR2TLAyCkCyI8JqZNy+FeON0AhVgNJoUumVoZQjBFUqHkw== 659 + 655 660 ansi-escapes@^4.2.1: 656 661 version "4.3.2" 657 662 resolved "https://registry.yarnpkg.com/ansi-escapes/-/ansi-escapes-4.3.2.tgz#6b2291d1db7d98b6521d5f1efa42d0f3a9feb65e" ··· 799 804 version "1.1.2" 800 805 resolved "https://registry.yarnpkg.com/buffer-from/-/buffer-from-1.1.2.tgz#2b146a6fd72e80b4f55d255f35ed59a3a9a41bd5" 801 806 integrity sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ== 807 + 808 + callbag-from-iter@^1.3.0: 809 + version "1.3.0" 810 + resolved "https://registry.yarnpkg.com/callbag-from-iter/-/callbag-from-iter-1.3.0.tgz#6757c1de9f8558b877546d475e3082823662c54c" 811 + integrity sha512-iBc6O6DYDWQBjlOZzdU/SbJofK0pN3TjNWC4fDIQePBk5FWCZKVll/coXiVU6gpwbBaS0Om/d/PVbVK8Ki17Ew== 812 + dependencies: 813 + callbag "^1.2.0" 814 + 815 + callbag-iterate@^1.0.0: 816 + version "1.0.0" 817 + resolved "https://registry.yarnpkg.com/callbag-iterate/-/callbag-iterate-1.0.0.tgz#97116f09296ef2d5073b35125891dca93349aeeb" 818 + integrity sha512-bynCbDuqGZkj1mXAhGr8jMf8Vhifps+G+pF3xlcz3jcaZLNXHghVjValnJtBTg2N74cyl347UzagJ8niJpyF6Q== 819 + 820 + callbag-take@^1.5.0: 821 + version "1.5.0" 822 + resolved "https://registry.yarnpkg.com/callbag-take/-/callbag-take-1.5.0.tgz#3062b42f8ed4ed7e021d2cd1306c029d2ef4a12d" 823 + integrity sha512-8aOxp+gzfVQtDe+tk9PhKbC9QR9Vap4KFA0xccUiXFK9VjIS0fSt/Yi454viPpMhJkhRcx1BsjyF34Cj57W89A== 824 + dependencies: 825 + callbag "^1.1.0" 826 + 827 + callbag@^1.1.0, callbag@^1.2.0: 828 + version "1.5.1" 829 + resolved "https://registry.yarnpkg.com/callbag/-/callbag-1.5.1.tgz#383464c8c2e81a584b8d7df9feace4f7720409f1" 830 + integrity sha512-FGpkXYZ018Wpeevhsp7R2iABZuDamg54TaObKI48JBNGzMlkb9bRwS54eR0sgFksQpspsCan4iaaVbAbxm9MIg== 802 831 803 832 callsites@^3.0.0: 804 833 version "3.1.0" ··· 2147 2176 version "0.1.0" 2148 2177 resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b" 2149 2178 integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q== 2179 + 2180 + zen-observable@^0.8.15: 2181 + version "0.8.15" 2182 + resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.8.15.tgz#96415c512d8e3ffd920afd3889604e30b9eaac15" 2183 + integrity sha512-PQ2PC7R9rslx84ndNBZB/Dkv8V8fZEpk83RLgXtYd0fwUgEjseMn1Dgajh2x6S8QbZAFa9p2qVCEuYZNgve0dQ==