MIRROR: javascript for ๐Ÿœ's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

add queue handler

+17 -16
+3
include/streams/readable.h
··· 17 17 uint32_t queue_sizes_cap; 18 18 double queue_total_size; 19 19 double strategy_hwm; 20 + 20 21 bool close_requested; 22 + bool defer_close; 23 + bool in_enqueue; 21 24 bool pull_again; 22 25 bool pulling; 23 26 bool started;
+11 -4
src/streams/readable.c
··· 418 418 if (!rs_default_controller_can_close_or_enqueue(ctrl, stream)) 419 419 return js_mkerr_typed(js, JS_ERR_TYPE, "The stream is not in a state that permits close"); 420 420 ctrl->close_requested = true; 421 - if (rs_ctrl_queue_len(js, js->this_val) == 0) { 421 + if (rs_ctrl_queue_len(js, js->this_val) == 0 && !(ctrl->in_enqueue && ctrl->defer_close)) { 422 422 rs_default_controller_clear_algorithms(js->this_val); 423 423 readable_stream_close(js, stream_obj); 424 424 } ··· 444 444 445 445 double chunk_size = 1; 446 446 ant_value_t size_fn = rs_ctrl_size(js->this_val); 447 + ctrl->in_enqueue = true; 448 + 447 449 if (is_callable(size_fn)) { 448 450 ant_value_t size_args[1] = { chunk }; 449 451 ant_value_t size_result = sv_vm_call(js->vm, js, size_fn, js_mkundef(), size_args, 1, NULL, false); 450 452 if (is_err(size_result)) { 451 453 ant_value_t thrown = js->thrown_value; 452 454 ant_value_t err = is_object_type(thrown) ? thrown : size_result; 453 - if (stream && stream->state == RS_STATE_ERRORED) 454 - err = rs_stream_error(stream_obj); 455 + if (stream && stream->state == RS_STATE_ERRORED) err = rs_stream_error(stream_obj); 456 + ctrl->in_enqueue = false; 455 457 readable_stream_error(js, stream_obj, err); 456 458 return js_throw(js, err); 457 459 } 460 + 458 461 if (vtype(size_result) == T_NUM) chunk_size = js_getnum(size_result); 459 462 else chunk_size = js_to_number(js, size_result); 460 463 } 464 + ctrl->in_enqueue = false; 461 465 462 466 if (chunk_size < 0 || chunk_size != chunk_size || chunk_size == (double)INFINITY) { 463 467 js_mkerr_typed(js, JS_ERR_RANGE, ··· 514 518 515 519 double chunk_size = 1; 516 520 ant_value_t size_fn = rs_ctrl_size(ctrl_obj); 521 + ctrl->in_enqueue = true; 517 522 if (is_callable(size_fn)) { 518 523 ant_value_t size_args[1] = { chunk }; 519 524 ant_value_t size_result = sv_vm_call(js->vm, js, size_fn, js_mkundef(), size_args, 1, NULL, false); ··· 522 527 ant_value_t err = is_object_type(thrown) ? thrown : size_result; 523 528 if (stream && stream->state == RS_STATE_ERRORED) 524 529 err = rs_stream_error(stream_obj); 530 + ctrl->in_enqueue = false; 525 531 js->thrown_exists = false; 526 532 js->thrown_value = js_mkundef(); 527 533 js->thrown_stack = js_mkundef(); ··· 531 537 if (vtype(size_result) == T_NUM) chunk_size = js_getnum(size_result); 532 538 else chunk_size = js_to_number(js, size_result); 533 539 } 540 + ctrl->in_enqueue = false; 534 541 535 542 if (chunk_size < 0 || chunk_size != chunk_size || chunk_size == (double)INFINITY) { 536 543 ant_value_t err = js_make_error_silent(js, JS_ERR_RANGE, ··· 562 569 if (!rs_default_controller_can_close_or_enqueue(ctrl, stream)) return; 563 570 ctrl->close_requested = true; 564 571 565 - if (rs_ctrl_queue_len(js, ctrl_obj) == 0) { 572 + if (rs_ctrl_queue_len(js, ctrl_obj) == 0 && !(ctrl->in_enqueue && ctrl->defer_close)) { 566 573 rs_default_controller_clear_algorithms(ctrl_obj); 567 574 readable_stream_close(js, stream_obj); 568 575 }
+3 -12
src/streams/transform.c
··· 408 408 ant_value_t p = js_get_slot(wrapper, SLOT_DATA); 409 409 ant_value_t ts_obj = js_get_slot(wrapper, SLOT_ENTRIES); 410 410 ant_value_t reason = js_get_slot(wrapper, SLOT_BUFFER); 411 - bool started_by_abort = js_get_slot(wrapper, SLOT_CTOR) == js_true; 412 411 413 412 ant_value_t readable = ts_readable(ts_obj); 414 413 rs_stream_t *rs = rs_get_stream(readable); ··· 421 420 js_reject_promise(js, p, rs_stream_error(readable)); 422 421 return js_mkundef(); 423 422 } 424 - 425 423 ant_value_t writable = ts_writable(ts_obj); 426 424 ws_stream_t *ws = ws_get_stream(writable); 427 - if (!started_by_abort && ws && (ws->has_pending_abort || ts_cancel_joined_abort(ts_obj))) { 428 - js_reject_promise(js, p, ts_writable_stored_error(ts_obj)); 429 - return js_mkundef(); 430 - } 431 425 432 426 if (ws && ws->state == WS_STATE_WRITABLE) 433 427 ts_error_writable_and_unblock_write(js, ts_obj, reason); ··· 451 445 ant_value_t p = js_get_slot(wrapper, SLOT_DATA); 452 446 ant_value_t ts_obj = js_get_slot(wrapper, SLOT_ENTRIES); 453 447 ant_value_t reason = js_get_slot(wrapper, SLOT_BUFFER); 454 - bool started_by_abort = js_get_slot(wrapper, SLOT_CTOR) == js_true; 455 448 456 449 ant_value_t readable = ts_readable(ts_obj); 457 450 rs_stream_t *rs = rs_get_stream(readable); 458 451 if (rs && rs->state == RS_STATE_ERRORED) { 459 452 js_reject_promise(js, p, rs_stream_error(readable)); 460 - return js_mkundef(); 461 - } 462 - 463 - if (!started_by_abort) { 464 - js_reject_promise(js, p, ts_writable_stored_error(ts_obj)); 465 453 return js_mkundef(); 466 454 } 467 455 ··· 677 665 rs_stream_t *rs = rs_get_stream(readable); 678 666 if (rs && rs->state == RS_STATE_READABLE) { 679 667 ant_value_t rs_ctrl = rs_stream_controller(js, readable); 668 + rs_controller_t *rc = rs_get_controller(rs_ctrl); 669 + if (rc) rc->defer_close = true; 680 670 rs_controller_close(js, rs_ctrl); 671 + if (rc) rc->defer_close = false; 681 672 } 682 673 ts_set_flushing(ts_obj, false); 683 674 js_resolve_promise(js, p, js_mkundef());