Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add take operator and tests

+67
+34
__tests__/callbag_test.re
··· 306 306 expect(res) |> toEqual([| (1, 2), (2, 2), (2, 4) |]); 307 307 }); 308 308 }); 309 + 310 + describe("take", () => { 311 + open Expect; 312 + 313 + it("only lets a maximum number of values through", () => { 314 + let talkback = ref((_: Callbag_types.talkbackT) => ()); 315 + let num = ref(1); 316 + 317 + let source = Callbag.take(2, sink => sink(Start(signal => { 318 + switch (signal) { 319 + | Pull => { 320 + let i = num^; 321 + num := num^ + 1; 322 + sink(Push(i)); 323 + } 324 + | _ => () 325 + } 326 + }))); 327 + 328 + let res = [||]; 329 + 330 + source(signal => { 331 + switch (signal) { 332 + | Start(x) => talkback := x 333 + | _ => ignore(Js.Array.push(signal, res)) 334 + } 335 + }); 336 + 337 + talkback^(Pull); 338 + talkback^(Pull); 339 + talkback^(Pull); 340 + expect(res) |> toEqual([| Push(1), Push(2), End |]); 341 + }); 342 + }); 309 343 }); 310 344 311 345 describe("sink factories", () => {
+27
src/callbag.re
··· 237 237 })); 238 238 }; 239 239 240 + let take = (max, source, sink) => { 241 + let taken = ref(0); 242 + let talkback = ref((_: talkbackT) => ()); 243 + 244 + source(signal => { 245 + switch (signal) { 246 + | Start(tb) => { 247 + talkback := tb; 248 + sink(Start(signal => { 249 + if (taken^ < max) tb(signal); 250 + })); 251 + } 252 + | Push(_) when taken^ < max => { 253 + taken := taken^ + 1; 254 + sink(signal); 255 + 256 + if (taken^ === max) { 257 + sink(End); 258 + talkback^(End); 259 + }; 260 + } 261 + | End => sink(End) 262 + | _ => () 263 + } 264 + }); 265 + }; 266 + 240 267 let forEach = (f, source) => 241 268 captureTalkback(source, [@bs] (signal, talkback) => { 242 269 switch (signal) {
+6
src/callbag.rei
··· 60 60 ) => 61 61 unit; 62 62 63 + /* Takes a max number and a source, and creates a sink & source. 64 + It will emit values that the sink receives until the passed maximum number 65 + of values is reached, at which point it will end the source and the 66 + returned, new source. */ 67 + let take: (int, (signalT('a) => unit) => unit, signalT('a) => unit) => unit; 68 + 63 69 /* -- sink factories */ 64 70 65 71 /* Takes a function and a source, and creates a sink.