···11+import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
22+import { push, start } from '../helpers';
33+44+/* This tests a noop operator for passive Pull talkback signals.
55+ A Pull will be sent from the sink upwards and should pass through
66+ the operator until the source receives it, which then pushes a
77+ value down. */
88+export const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => {
99+ it('responds to Pull talkback signals (spec)', () => {
1010+ let talkback: TalkbackFn | null = null;
1111+ let pushes = 0;
1212+ const values: any[] = [];
1313+1414+ const source: Source<any> = sink => {
1515+ sink(
1616+ start(signal => {
1717+ if (!pushes && signal === TalkbackKind.Pull) {
1818+ pushes++;
1919+ sink(push(0));
2020+ }
2121+ })
2222+ );
2323+ };
2424+2525+ const sink: Sink<any> = signal => {
2626+ expect(signal).not.toBe(SignalKind.End);
2727+ if (signal === SignalKind.End) {
2828+ /*noop*/
2929+ } else if (signal.tag === SignalKind.Push) {
3030+ values.push(signal[0]);
3131+ } else {
3232+ talkback = signal[0];
3333+ }
3434+ };
3535+3636+ operator(source)(sink);
3737+ // The Start signal should always come in immediately
3838+ expect(talkback).not.toBe(null);
3939+ // No Push signals should be issued initially
4040+ expect(values).toEqual([]);
4141+4242+ // When pulling a value we expect an immediate response
4343+ talkback!(TalkbackKind.Pull);
4444+ jest.runAllTimers();
4545+ expect(values).toEqual([output]);
4646+ });
4747+};
4848+4949+/* This tests a noop operator for regular, active Push signals.
5050+ A Push will be sent downwards from the source, through the
5151+ operator to the sink. Pull events should be let through from
5252+ the sink after every Push event. */
5353+export const passesActivePush = (operator: Operator<any, any>, result: any = 0) => {
5454+ it('responds to eager Push signals (spec)', () => {
5555+ const values: any[] = [];
5656+ let talkback: TalkbackFn | null = null;
5757+ let sink: Sink<any> | null = null;
5858+ let pulls = 0;
5959+6060+ const source: Source<any> = _sink => {
6161+ (sink = _sink)(
6262+ start(signal => {
6363+ if (signal === TalkbackKind.Pull) pulls++;
6464+ })
6565+ );
6666+ };
6767+6868+ operator(source)(signal => {
6969+ expect(signal).not.toBe(SignalKind.End);
7070+ if (signal === SignalKind.End) {
7171+ /*noop*/
7272+ } else if (signal.tag === SignalKind.Start) {
7373+ talkback = signal[0];
7474+ } else if (signal.tag === SignalKind.Push) {
7575+ values.push(signal[0]);
7676+ talkback!(TalkbackKind.Pull);
7777+ }
7878+ });
7979+8080+ // No Pull signals should be issued initially
8181+ expect(pulls).toBe(0);
8282+8383+ // When pushing a value we expect an immediate response
8484+ sink!(push(0));
8585+ jest.runAllTimers();
8686+ expect(values).toEqual([result]);
8787+ // Subsequently the Pull signal should have travelled upwards
8888+ expect(pulls).toBe(1);
8989+ });
9090+};
9191+9292+/* This tests a noop operator for Close talkback signals from the sink.
9393+ A Close signal will be sent, which should be forwarded to the source,
9494+ which then ends the communication without sending an End signal. */
9595+export const passesSinkClose = (operator: Operator<any, any>) => {
9696+ it('responds to Close signals from sink (spec)', () => {
9797+ let talkback: TalkbackFn | null = null;
9898+ let closing = 0;
9999+100100+ const source: Source<any> = sink => {
101101+ sink(
102102+ start(signal => {
103103+ if (signal === TalkbackKind.Pull && !closing) {
104104+ sink(push(0));
105105+ } else if (signal === TalkbackKind.Close) {
106106+ closing++;
107107+ }
108108+ })
109109+ );
110110+ };
111111+112112+ const sink: Sink<any> = signal => {
113113+ expect(signal).not.toBe(SignalKind.End);
114114+ if (signal === SignalKind.End) {
115115+ /*noop*/
116116+ } else if (signal.tag === SignalKind.Push) {
117117+ talkback!(TalkbackKind.Close);
118118+ } else {
119119+ talkback = signal[0];
120120+ }
121121+ };
122122+123123+ operator(source)(sink);
124124+125125+ // When pushing a value we expect an immediate close signal
126126+ talkback!(TalkbackKind.Pull);
127127+ jest.runAllTimers();
128128+ expect(closing).toBe(1);
129129+ });
130130+};
131131+132132+/* This tests a noop operator for End signals from the source.
133133+ A Push and End signal will be sent after the first Pull talkback
134134+ signal from the sink, which shouldn't lead to any extra Close or Pull
135135+ talkback signals. */
136136+export const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => {
137137+ it('passes on immediate Push then End signals from source (spec)', () => {
138138+ const signals: Signal<any>[] = [];
139139+ let talkback: TalkbackFn | null = null;
140140+ let pulls = 0;
141141+ let ending = 0;
142142+143143+ const source: Source<any> = sink => {
144144+ sink(
145145+ start(signal => {
146146+ expect(signal).not.toBe(TalkbackKind.Close);
147147+ if (signal === TalkbackKind.Pull) {
148148+ pulls++;
149149+ if (pulls === 1) {
150150+ sink(push(0));
151151+ sink(SignalKind.End);
152152+ }
153153+ }
154154+ })
155155+ );
156156+ };
157157+158158+ const sink: Sink<any> = signal => {
159159+ if (signal === SignalKind.End) {
160160+ signals.push(signal);
161161+ ending++;
162162+ } else if (signal.tag === SignalKind.Push) {
163163+ signals.push(signal);
164164+ } else {
165165+ talkback = signal[0];
166166+ }
167167+ };
168168+169169+ operator(source)(sink);
170170+171171+ // When pushing a value we expect an immediate Push then End signal
172172+ talkback!(TalkbackKind.Pull);
173173+ jest.runAllTimers();
174174+ expect(ending).toBe(1);
175175+ expect(signals).toEqual([push(result), SignalKind.End]);
176176+ // Also no additional pull event should be created by the operator
177177+ expect(pulls).toBe(1);
178178+ });
179179+};
180180+181181+/* This tests a noop operator for End signals from the source
182182+ after the first pull in response to another.
183183+ This is similar to passesSourceEnd but more well behaved since
184184+ mergeMap/switchMap/concatMap are eager operators. */
185185+export const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => {
186186+ it('passes on End signals from source (spec)', () => {
187187+ const signals: Signal<any>[] = [];
188188+ let talkback: TalkbackFn | null = null;
189189+ let pulls = 0;
190190+ let ending = 0;
191191+192192+ const source: Source<any> = sink => {
193193+ sink(
194194+ start(signal => {
195195+ expect(signal).not.toBe(TalkbackKind.Close);
196196+ if (signal === TalkbackKind.Pull) {
197197+ pulls++;
198198+ if (pulls <= 2) {
199199+ sink(push(0));
200200+ } else {
201201+ sink(SignalKind.End);
202202+ }
203203+ }
204204+ })
205205+ );
206206+ };
207207+208208+ const sink: Sink<any> = signal => {
209209+ if (signal === SignalKind.End) {
210210+ signals.push(signal);
211211+ ending++;
212212+ } else if (signal.tag === SignalKind.Push) {
213213+ signals.push(signal);
214214+ talkback!(TalkbackKind.Pull);
215215+ } else {
216216+ talkback = signal[0];
217217+ }
218218+ };
219219+220220+ operator(source)(sink);
221221+222222+ // When pushing a value we expect an immediate Push then End signal
223223+ talkback!(TalkbackKind.Pull);
224224+ jest.runAllTimers();
225225+ expect(ending).toBe(1);
226226+ expect(pulls).toBe(3);
227227+ expect(signals).toEqual([push(result), push(result), SignalKind.End]);
228228+ });
229229+};
230230+231231+/* This tests a noop operator for Start signals from the source.
232232+ When the operator's sink is started by the source it'll receive
233233+ a Start event. As a response it should never send more than one
234234+ Start signals to the sink. */
235235+export const passesSingleStart = (operator: Operator<any, any>) => {
236236+ it('sends a single Start event to the incoming sink (spec)', () => {
237237+ let starts = 0;
238238+239239+ const source: Source<any> = sink => {
240240+ sink(start(() => {}));
241241+ };
242242+243243+ const sink: Sink<any> = signal => {
244244+ if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
245245+ starts++;
246246+ }
247247+ };
248248+249249+ // When starting the operator we expect a single start event on the sink
250250+ operator(source)(sink);
251251+ expect(starts).toBe(1);
252252+ });
253253+};
254254+255255+/* This tests a noop operator for silence after End signals from the source.
256256+ When the operator receives the End signal it shouldn't forward any other
257257+ signals to the sink anymore.
258258+ This isn't a strict requirement, but some operators should ensure that
259259+ all sources are well behaved. This is particularly true for operators
260260+ that either Close sources themselves or may operate on multiple sources. */
261261+export const passesStrictEnd = (operator: Operator<any, any>) => {
262262+ it('stops all signals after End has been received (spec: strict end)', () => {
263263+ let pulls = 0;
264264+ const signals: Signal<any>[] = [];
265265+266266+ const source: Source<any> = sink => {
267267+ sink(
268268+ start(signal => {
269269+ if (signal === TalkbackKind.Pull) {
270270+ pulls++;
271271+ sink(SignalKind.End);
272272+ sink(push(123));
273273+ }
274274+ })
275275+ );
276276+ };
277277+278278+ const sink: Sink<any> = signal => {
279279+ if (signal === SignalKind.End) {
280280+ signals.push(signal);
281281+ } else if (signal.tag === SignalKind.Push) {
282282+ signals.push(signal);
283283+ } else {
284284+ signal[0](TalkbackKind.Pull);
285285+ }
286286+ };
287287+288288+ operator(source)(sink);
289289+290290+ // The Push signal should've been dropped
291291+ jest.runAllTimers();
292292+ expect(signals).toEqual([SignalKind.End]);
293293+ expect(pulls).toBe(1);
294294+ });
295295+296296+ it('stops all signals after Close has been received (spec: strict close)', () => {
297297+ const signals: Signal<any>[] = [];
298298+299299+ const source: Source<any> = sink => {
300300+ sink(
301301+ start(signal => {
302302+ if (signal === TalkbackKind.Close) {
303303+ sink(push(123));
304304+ }
305305+ })
306306+ );
307307+ };
308308+309309+ const sink: Sink<any> = signal => {
310310+ if (signal === SignalKind.End) {
311311+ signals.push(signal);
312312+ } else if (signal.tag === SignalKind.Push) {
313313+ signals.push(signal);
314314+ } else {
315315+ signal[0](TalkbackKind.Close);
316316+ }
317317+ };
318318+319319+ operator(source)(sink);
320320+321321+ // The Push signal should've been dropped
322322+ jest.runAllTimers();
323323+ expect(signals).toEqual([]);
324324+ });
325325+};
326326+327327+/* This tests an immediately closing operator for End signals to
328328+ the sink and Close signals to the source.
329329+ When an operator closes immediately we expect to see a Close
330330+ signal at the source and an End signal to the sink, since the
331331+ closing operator is expected to end the entire chain. */
332332+export const passesCloseAndEnd = (closingOperator: Operator<any, any>) => {
333333+ it('closes the source and ends the sink correctly (spec: ending operator)', () => {
334334+ let closing = 0;
335335+ let ending = 0;
336336+337337+ const source: Source<any> = sink => {
338338+ sink(
339339+ start(signal => {
340340+ // For some operator tests we do need to send a single value
341341+ if (signal === TalkbackKind.Pull) {
342342+ sink(push(null));
343343+ } else {
344344+ closing++;
345345+ }
346346+ })
347347+ );
348348+ };
349349+350350+ const sink: Sink<any> = signal => {
351351+ if (signal === SignalKind.End) {
352352+ ending++;
353353+ } else if (signal.tag === SignalKind.Start) {
354354+ signal[0](TalkbackKind.Pull);
355355+ }
356356+ };
357357+358358+ // We expect the operator to immediately end and close
359359+ closingOperator(source)(sink);
360360+ expect(closing).toBe(1);
361361+ expect(ending).toBe(1);
362362+ });
363363+};
364364+365365+export const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => {
366366+ it('passes an async push with an async end (spec)', () => {
367367+ let hasPushed = false;
368368+ const signals: Signal<any>[] = [];
369369+370370+ const source: Source<any> = sink => {
371371+ sink(
372372+ start(signal => {
373373+ if (signal === TalkbackKind.Pull && !hasPushed) {
374374+ hasPushed = true;
375375+ setTimeout(() => sink(push(0)), 10);
376376+ setTimeout(() => sink(SignalKind.End), 20);
377377+ }
378378+ })
379379+ );
380380+ };
381381+382382+ const sink: Sink<any> = signal => {
383383+ if (signal === SignalKind.End) {
384384+ signals.push(signal);
385385+ } else if (signal.tag === SignalKind.Push) {
386386+ signals.push(signal);
387387+ } else {
388388+ setTimeout(() => {
389389+ signal[0](TalkbackKind.Pull);
390390+ }, 5);
391391+ }
392392+ };
393393+394394+ // We initially expect to see the push signal
395395+ // Afterwards after all timers all other signals come in
396396+ operator(source)(sink);
397397+ expect(signals.length).toBe(0);
398398+ jest.advanceTimersByTime(5);
399399+ expect(hasPushed).toBeTruthy();
400400+ jest.runAllTimers();
401401+402402+ expect(signals).toEqual([push(result), SignalKind.End]);
403403+ });
404404+};
+13-403
src/__tests__/operators.test.ts
···11-import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
11+import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
22import { push, start } from '../helpers';
3344+import {
55+ passesPassivePull,
66+ passesActivePush,
77+ passesSinkClose,
88+ passesSourceEnd,
99+ passesSingleStart,
1010+ passesStrictEnd,
1111+ passesSourcePushThenEnd,
1212+ passesAsyncSequence,
1313+ passesCloseAndEnd,
1414+} from './compliance';
1515+416import * as sources from '../sources';
517import * as sinks from '../sinks';
618import * as operators from '../operators';
77-88-/* This tests a noop operator for passive Pull talkback signals.
99- A Pull will be sent from the sink upwards and should pass through
1010- the operator until the source receives it, which then pushes a
1111- value down. */
1212-const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => {
1313- it('responds to Pull talkback signals (spec)', () => {
1414- let talkback: TalkbackFn | null = null;
1515- let pushes = 0;
1616- const values: any[] = [];
1717-1818- const source: Source<any> = sink => {
1919- sink(
2020- start(signal => {
2121- if (!pushes && signal === TalkbackKind.Pull) {
2222- pushes++;
2323- sink(push(0));
2424- }
2525- })
2626- );
2727- };
2828-2929- const sink: Sink<any> = signal => {
3030- expect(signal).not.toBe(SignalKind.End);
3131- if (signal === SignalKind.End) {
3232- /*noop*/
3333- } else if (signal.tag === SignalKind.Push) {
3434- values.push(signal[0]);
3535- } else {
3636- talkback = signal[0];
3737- }
3838- };
3939-4040- operator(source)(sink);
4141- // The Start signal should always come in immediately
4242- expect(talkback).not.toBe(null);
4343- // No Push signals should be issued initially
4444- expect(values).toEqual([]);
4545-4646- // When pulling a value we expect an immediate response
4747- talkback!(TalkbackKind.Pull);
4848- jest.runAllTimers();
4949- expect(values).toEqual([output]);
5050- });
5151-};
5252-5353-/* This tests a noop operator for regular, active Push signals.
5454- A Push will be sent downwards from the source, through the
5555- operator to the sink. Pull events should be let through from
5656- the sink after every Push event. */
5757-const passesActivePush = (operator: Operator<any, any>, result: any = 0) => {
5858- it('responds to eager Push signals (spec)', () => {
5959- const values: any[] = [];
6060- let talkback: TalkbackFn | null = null;
6161- let sink: Sink<any> | null = null;
6262- let pulls = 0;
6363-6464- const source: Source<any> = _sink => {
6565- (sink = _sink)(
6666- start(signal => {
6767- if (signal === TalkbackKind.Pull) pulls++;
6868- })
6969- );
7070- };
7171-7272- operator(source)(signal => {
7373- expect(signal).not.toBe(SignalKind.End);
7474- if (signal === SignalKind.End) {
7575- /*noop*/
7676- } else if (signal.tag === SignalKind.Start) {
7777- talkback = signal[0];
7878- } else if (signal.tag === SignalKind.Push) {
7979- values.push(signal[0]);
8080- talkback!(TalkbackKind.Pull);
8181- }
8282- });
8383-8484- // No Pull signals should be issued initially
8585- expect(pulls).toBe(0);
8686-8787- // When pushing a value we expect an immediate response
8888- sink!(push(0));
8989- jest.runAllTimers();
9090- expect(values).toEqual([result]);
9191- // Subsequently the Pull signal should have travelled upwards
9292- expect(pulls).toBe(1);
9393- });
9494-};
9595-9696-/* This tests a noop operator for Close talkback signals from the sink.
9797- A Close signal will be sent, which should be forwarded to the source,
9898- which then ends the communication without sending an End signal. */
9999-const passesSinkClose = (operator: Operator<any, any>) => {
100100- it('responds to Close signals from sink (spec)', () => {
101101- let talkback: TalkbackFn | null = null;
102102- let closing = 0;
103103-104104- const source: Source<any> = sink => {
105105- sink(
106106- start(signal => {
107107- if (signal === TalkbackKind.Pull && !closing) {
108108- sink(push(0));
109109- } else if (signal === TalkbackKind.Close) {
110110- closing++;
111111- }
112112- })
113113- );
114114- };
115115-116116- const sink: Sink<any> = signal => {
117117- expect(signal).not.toBe(SignalKind.End);
118118- if (signal === SignalKind.End) {
119119- /*noop*/
120120- } else if (signal.tag === SignalKind.Push) {
121121- talkback!(TalkbackKind.Close);
122122- } else {
123123- talkback = signal[0];
124124- }
125125- };
126126-127127- operator(source)(sink);
128128-129129- // When pushing a value we expect an immediate close signal
130130- talkback!(TalkbackKind.Pull);
131131- jest.runAllTimers();
132132- expect(closing).toBe(1);
133133- });
134134-};
135135-136136-/* This tests a noop operator for End signals from the source.
137137- A Push and End signal will be sent after the first Pull talkback
138138- signal from the sink, which shouldn't lead to any extra Close or Pull
139139- talkback signals. */
140140-const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => {
141141- it('passes on immediate Push then End signals from source (spec)', () => {
142142- const signals: Signal<any>[] = [];
143143- let talkback: TalkbackFn | null = null;
144144- let pulls = 0;
145145- let ending = 0;
146146-147147- const source: Source<any> = sink => {
148148- sink(
149149- start(signal => {
150150- expect(signal).not.toBe(TalkbackKind.Close);
151151- if (signal === TalkbackKind.Pull) {
152152- pulls++;
153153- if (pulls === 1) {
154154- sink(push(0));
155155- sink(SignalKind.End);
156156- }
157157- }
158158- })
159159- );
160160- };
161161-162162- const sink: Sink<any> = signal => {
163163- if (signal === SignalKind.End) {
164164- signals.push(signal);
165165- ending++;
166166- } else if (signal.tag === SignalKind.Push) {
167167- signals.push(signal);
168168- } else {
169169- talkback = signal[0];
170170- }
171171- };
172172-173173- operator(source)(sink);
174174-175175- // When pushing a value we expect an immediate Push then End signal
176176- talkback!(TalkbackKind.Pull);
177177- jest.runAllTimers();
178178- expect(ending).toBe(1);
179179- expect(signals).toEqual([push(result), SignalKind.End]);
180180- // Also no additional pull event should be created by the operator
181181- expect(pulls).toBe(1);
182182- });
183183-};
184184-185185-/* This tests a noop operator for End signals from the source
186186- after the first pull in response to another.
187187- This is similar to passesSourceEnd but more well behaved since
188188- mergeMap/switchMap/concatMap are eager operators. */
189189-const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => {
190190- it('passes on End signals from source (spec)', () => {
191191- const signals: Signal<any>[] = [];
192192- let talkback: TalkbackFn | null = null;
193193- let pulls = 0;
194194- let ending = 0;
195195-196196- const source: Source<any> = sink => {
197197- sink(
198198- start(signal => {
199199- expect(signal).not.toBe(TalkbackKind.Close);
200200- if (signal === TalkbackKind.Pull) {
201201- pulls++;
202202- if (pulls <= 2) {
203203- sink(push(0));
204204- } else {
205205- sink(SignalKind.End);
206206- }
207207- }
208208- })
209209- );
210210- };
211211-212212- const sink: Sink<any> = signal => {
213213- if (signal === SignalKind.End) {
214214- signals.push(signal);
215215- ending++;
216216- } else if (signal.tag === SignalKind.Push) {
217217- signals.push(signal);
218218- talkback!(TalkbackKind.Pull);
219219- } else {
220220- talkback = signal[0];
221221- }
222222- };
223223-224224- operator(source)(sink);
225225-226226- // When pushing a value we expect an immediate Push then End signal
227227- talkback!(TalkbackKind.Pull);
228228- jest.runAllTimers();
229229- expect(ending).toBe(1);
230230- expect(pulls).toBe(3);
231231- expect(signals).toEqual([push(result), push(result), SignalKind.End]);
232232- });
233233-};
234234-235235-/* This tests a noop operator for Start signals from the source.
236236- When the operator's sink is started by the source it'll receive
237237- a Start event. As a response it should never send more than one
238238- Start signals to the sink. */
239239-const passesSingleStart = (operator: Operator<any, any>) => {
240240- it('sends a single Start event to the incoming sink (spec)', () => {
241241- let starts = 0;
242242-243243- const source: Source<any> = sink => {
244244- sink(start(() => {}));
245245- };
246246-247247- const sink: Sink<any> = signal => {
248248- if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
249249- starts++;
250250- }
251251- };
252252-253253- // When starting the operator we expect a single start event on the sink
254254- operator(source)(sink);
255255- expect(starts).toBe(1);
256256- });
257257-};
258258-259259-/* This tests a noop operator for silence after End signals from the source.
260260- When the operator receives the End signal it shouldn't forward any other
261261- signals to the sink anymore.
262262- This isn't a strict requirement, but some operators should ensure that
263263- all sources are well behaved. This is particularly true for operators
264264- that either Close sources themselves or may operate on multiple sources. */
265265-const passesStrictEnd = (operator: Operator<any, any>) => {
266266- it('stops all signals after End has been received (spec: strict end)', () => {
267267- let pulls = 0;
268268- const signals: Signal<any>[] = [];
269269-270270- const source: Source<any> = sink => {
271271- sink(
272272- start(signal => {
273273- if (signal === TalkbackKind.Pull) {
274274- pulls++;
275275- sink(SignalKind.End);
276276- sink(push(123));
277277- }
278278- })
279279- );
280280- };
281281-282282- const sink: Sink<any> = signal => {
283283- if (signal === SignalKind.End) {
284284- signals.push(signal);
285285- } else if (signal.tag === SignalKind.Push) {
286286- signals.push(signal);
287287- } else {
288288- signal[0](TalkbackKind.Pull);
289289- }
290290- };
291291-292292- operator(source)(sink);
293293-294294- // The Push signal should've been dropped
295295- jest.runAllTimers();
296296- expect(signals).toEqual([SignalKind.End]);
297297- expect(pulls).toBe(1);
298298- });
299299-300300- it('stops all signals after Close has been received (spec: strict close)', () => {
301301- const signals: Signal<any>[] = [];
302302-303303- const source: Source<any> = sink => {
304304- sink(
305305- start(signal => {
306306- if (signal === TalkbackKind.Close) {
307307- sink(push(123));
308308- }
309309- })
310310- );
311311- };
312312-313313- const sink: Sink<any> = signal => {
314314- if (signal === SignalKind.End) {
315315- signals.push(signal);
316316- } else if (signal.tag === SignalKind.Push) {
317317- signals.push(signal);
318318- } else {
319319- signal[0](TalkbackKind.Close);
320320- }
321321- };
322322-323323- operator(source)(sink);
324324-325325- // The Push signal should've been dropped
326326- jest.runAllTimers();
327327- expect(signals).toEqual([]);
328328- });
329329-};
330330-331331-/* This tests an immediately closing operator for End signals to
332332- the sink and Close signals to the source.
333333- When an operator closes immediately we expect to see a Close
334334- signal at the source and an End signal to the sink, since the
335335- closing operator is expected to end the entire chain. */
336336-const passesCloseAndEnd = (closingOperator: Operator<any, any>) => {
337337- it('closes the source and ends the sink correctly (spec: ending operator)', () => {
338338- let closing = 0;
339339- let ending = 0;
340340-341341- const source: Source<any> = sink => {
342342- sink(
343343- start(signal => {
344344- // For some operator tests we do need to send a single value
345345- if (signal === TalkbackKind.Pull) {
346346- sink(push(null));
347347- } else {
348348- closing++;
349349- }
350350- })
351351- );
352352- };
353353-354354- const sink: Sink<any> = signal => {
355355- if (signal === SignalKind.End) {
356356- ending++;
357357- } else if (signal.tag === SignalKind.Start) {
358358- signal[0](TalkbackKind.Pull);
359359- }
360360- };
361361-362362- // We expect the operator to immediately end and close
363363- closingOperator(source)(sink);
364364- expect(closing).toBe(1);
365365- expect(ending).toBe(1);
366366- });
367367-};
368368-369369-const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => {
370370- it('passes an async push with an async end (spec)', () => {
371371- let hasPushed = false;
372372- const signals: Signal<any>[] = [];
373373-374374- const source: Source<any> = sink => {
375375- sink(
376376- start(signal => {
377377- if (signal === TalkbackKind.Pull && !hasPushed) {
378378- hasPushed = true;
379379- setTimeout(() => sink(push(0)), 10);
380380- setTimeout(() => sink(SignalKind.End), 20);
381381- }
382382- })
383383- );
384384- };
385385-386386- const sink: Sink<any> = signal => {
387387- if (signal === SignalKind.End) {
388388- signals.push(signal);
389389- } else if (signal.tag === SignalKind.Push) {
390390- signals.push(signal);
391391- } else {
392392- setTimeout(() => {
393393- signal[0](TalkbackKind.Pull);
394394- }, 5);
395395- }
396396- };
397397-398398- // We initially expect to see the push signal
399399- // Afterwards after all timers all other signals come in
400400- operator(source)(sink);
401401- expect(signals.length).toBe(0);
402402- jest.advanceTimersByTime(5);
403403- expect(hasPushed).toBeTruthy();
404404- jest.runAllTimers();
405405-406406- expect(signals).toEqual([push(result), SignalKind.End]);
407407- });
408408-};
4091941020beforeEach(() => {
41121 jest.useFakeTimers();