MIRROR: javascript for ๐Ÿœ's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement pipe base

+826 -6
-1
include/gc/modules.h
··· 23 23 void gc_mark_domexception(ant_t *js, gc_mark_fn mark); 24 24 void gc_mark_readable_streams(ant_t *js, gc_mark_fn mark); 25 25 void gc_mark_writable_streams(ant_t *js, gc_mark_fn mark); 26 - void gc_mark_pipes(ant_t *js, gc_mark_fn mark); 27 26 28 27 #endif
-1
include/streams/pipes.h
··· 5 5 #include <stdbool.h> 6 6 7 7 void init_pipes_proto(ant_t *js, ant_value_t rs_proto); 8 - void gc_mark_pipes(ant_t *js, void (*mark)(ant_t *, ant_value_t)); 9 8 10 9 ant_value_t readable_stream_pipe_to( 11 10 ant_t *js, ant_value_t source, ant_value_t dest,
+2 -2
include/streams/writable.h
··· 35 35 ws_stream_t *ws_get_stream(ant_value_t obj); 36 36 ws_controller_t *ws_get_controller(ant_value_t obj); 37 37 38 - ant_value_t ws_stream_controller(ant_value_t stream_obj); 38 + ant_value_t ws_writer_ready(ant_value_t writer_obj); 39 39 ant_value_t ws_stream_writer(ant_value_t stream_obj); 40 - 40 + ant_value_t ws_stream_controller(ant_value_t stream_obj); 41 41 ant_value_t ws_acquire_writer(ant_t *js, ant_value_t stream_obj); 42 42 ant_value_t ws_writer_write(ant_t *js, ant_value_t writer_obj, ant_value_t chunk); 43 43
-1
src/gc/objects.c
··· 456 456 gc_mark_domexception(js, gc_mark_value); 457 457 gc_mark_readable_streams(js, gc_mark_value); 458 458 gc_mark_writable_streams(js, gc_mark_value); 459 - gc_mark_pipes(js, gc_mark_value); 460 459 461 460 for ( 462 461 ant_object_t *obj = g_pending_promises;
+823
src/streams/pipes.c
··· 1 + #include <string.h> 2 + 3 + #include "ant.h" 4 + #include "errors.h" 5 + #include "internal.h" 6 + #include "descriptors.h" 7 + 8 + #include "silver/engine.h" 9 + #include "modules/assert.h" 10 + #include "modules/domexception.h" 11 + #include "streams/pipes.h" 12 + #include "streams/readable.h" 13 + #include "streams/writable.h" 14 + 15 + static ant_value_t pipes_call_method( 16 + ant_t *js, ant_value_t obj, const char *name, 17 + ant_value_t *args, int nargs 18 + ) { 19 + ant_value_t fn = js_get(js, obj, name); 20 + if (!is_callable(fn)) return js_mkundef(); 21 + return sv_vm_call(js->vm, js, fn, obj, args, nargs, NULL, false); 22 + } 23 + 24 + static bool pipes_get_bool(ant_t *js, ant_value_t obj, const char *name) { 25 + return js_truthy(js, js_get(js, obj, name)); 26 + } 27 + 28 + static void pipes_set_bool(ant_t *js, ant_value_t obj, const char *name, bool value) { 29 + js_set(js, obj, name, js_bool(value)); 30 + } 31 + 32 + static ant_value_t pipes_abort_reason(ant_t *js, ant_value_t signal) { 33 + ant_value_t reason = js_get(js, signal, "reason"); 34 + if (!is_undefined(reason)) return reason; 35 + return make_dom_exception(js, "signal is aborted without reason", "AbortError"); 36 + } 37 + 38 + static bool pipes_is_abort_signal(ant_t *js, ant_value_t signal) { 39 + if (!is_object_type(signal)) return false; 40 + ant_value_t aborted = js_get(js, signal, "aborted"); 41 + ant_value_t add = js_get(js, signal, "addEventListener"); 42 + ant_value_t remove = js_get(js, signal, "removeEventListener"); 43 + return vtype(aborted) == T_BOOL && is_callable(add) && is_callable(remove); 44 + } 45 + 46 + static void pipes_chain_thenable( 47 + ant_t *js, ant_value_t value, 48 + ant_value_t on_resolve, ant_value_t on_reject 49 + ) { 50 + ant_value_t thenable = value; 51 + if (vtype(thenable) != T_PROMISE) { 52 + thenable = js_mkpromise(js); 53 + js_resolve_promise(js, thenable, value); 54 + } 55 + 56 + ant_value_t then_fn = js_get(js, thenable, "then"); 57 + if (!is_callable(then_fn)) return; 58 + 59 + ant_value_t then_args[2] = { on_resolve, on_reject }; 60 + sv_vm_call(js->vm, js, then_fn, thenable, then_args, 2, NULL, false); 61 + } 62 + 63 + typedef struct { 64 + bool settled; 65 + bool shutting_down; 66 + bool in_flight; 67 + bool prevent_close; 68 + bool prevent_abort; 69 + bool prevent_cancel; 70 + } pipe_state_t; 71 + 72 + static void pipe_state_finalize(ant_t *js, ant_object_t *obj) { 73 + if (!obj->extra_slots) return; 74 + ant_extra_slot_t *entries = (ant_extra_slot_t *)obj->extra_slots; 75 + for (uint8_t i = 0; i < obj->extra_count; i++) { 76 + if (entries[i].slot == SLOT_DATA && vtype(entries[i].value) == T_NUM) { 77 + free((pipe_state_t *)(uintptr_t)(size_t)js_getnum(entries[i].value)); 78 + return; 79 + }} 80 + } 81 + 82 + static pipe_state_t *pipe_get_state(ant_value_t state) { 83 + ant_value_t s = js_get_slot(state, SLOT_DATA); 84 + if (vtype(s) != T_NUM) return NULL; 85 + return (pipe_state_t *)(uintptr_t)(size_t)js_getnum(s); 86 + } 87 + 88 + static ant_value_t pipe_state_source(ant_value_t state) { 89 + return js_get_slot(state, SLOT_ENTRIES); 90 + } 91 + 92 + static ant_value_t pipe_state_dest(ant_value_t state) { 93 + return js_get_slot(state, SLOT_CTOR); 94 + } 95 + 96 + static ant_value_t pipe_state_reader(ant_value_t state) { 97 + return js_get_slot(state, SLOT_BUFFER); 98 + } 99 + 100 + static ant_value_t pipe_state_writer(ant_value_t state) { 101 + return js_get_slot(state, SLOT_DEFAULT); 102 + } 103 + 104 + static ant_value_t pipe_state_promise(ant_value_t state) { 105 + return js_get_slot(state, SLOT_RS_PULL); 106 + } 107 + 108 + static void pipes_release_reader(ant_t *js, ant_value_t reader_obj) { 109 + ant_value_t stream_obj = rs_reader_stream(reader_obj); 110 + if (!is_object_type(stream_obj)) return; 111 + 112 + if (rs_reader_has_reqs(js, reader_obj)) { 113 + js_mkerr_typed(js, JS_ERR_TYPE, "Reader was released"); 114 + rs_default_reader_error_read_requests(js, reader_obj, js->thrown_value); 115 + } 116 + 117 + ant_value_t old_closed = rs_reader_closed(reader_obj); 118 + ant_value_t new_closed = js_mkpromise(js); 119 + js_mkerr_typed(js, JS_ERR_TYPE, "Reader was released"); 120 + ant_value_t release_err = js->thrown_value; 121 + rs_stream_t *rs = rs_get_stream(stream_obj); 122 + 123 + if (rs && rs->state == RS_STATE_READABLE) { 124 + js_reject_promise(js, old_closed, release_err); 125 + promise_mark_handled(old_closed); 126 + } 127 + 128 + js_reject_promise(js, new_closed, release_err); 129 + promise_mark_handled(new_closed); 130 + js_set_slot(reader_obj, SLOT_RS_CLOSED, new_closed); 131 + js_set_slot(stream_obj, SLOT_CTOR, js_mkundef()); 132 + js_set_slot(reader_obj, SLOT_ENTRIES, js_mkundef()); 133 + } 134 + 135 + static void pipes_release_writer(ant_t *js, ant_value_t writer_obj) { 136 + ant_value_t ws_obj = js_get_slot(writer_obj, SLOT_ENTRIES); 137 + if (!is_object_type(ws_obj)) return; 138 + 139 + js_mkerr_typed(js, JS_ERR_TYPE, "Writer was released"); 140 + ant_value_t rel_err = js->thrown_value; 141 + ant_value_t ready = js_mkpromise(js); 142 + js_reject_promise(js, ready, rel_err); 143 + promise_mark_handled(ready); 144 + js_set_slot(writer_obj, SLOT_WS_READY, ready); 145 + 146 + ant_value_t closed = js_mkpromise(js); 147 + js_reject_promise(js, closed, rel_err); 148 + promise_mark_handled(closed); 149 + js_set_slot(writer_obj, SLOT_RS_CLOSED, closed); 150 + js_set_slot(ws_obj, SLOT_CTOR, js_mkundef()); 151 + js_set_slot(writer_obj, SLOT_ENTRIES, js_mkundef()); 152 + } 153 + 154 + static void pipes_release_locks(ant_t *js, ant_value_t state) { 155 + ant_value_t reader = pipe_state_reader(state); 156 + if (is_object_type(reader)) { 157 + pipes_release_reader(js, reader); 158 + js_set_slot(state, SLOT_BUFFER, js_mkundef()); 159 + } 160 + 161 + ant_value_t writer = pipe_state_writer(state); 162 + if (is_object_type(writer)) { 163 + pipes_release_writer(js, writer); 164 + js_set_slot(state, SLOT_DEFAULT, js_mkundef()); 165 + } 166 + } 167 + 168 + static void pipes_remove_abort_listener(ant_t *js, ant_value_t state) { 169 + ant_value_t signal = js_get(js, state, "signal"); 170 + ant_value_t listener = js_get(js, state, "abortListener"); 171 + if (!is_object_type(signal) || !is_callable(listener)) return; 172 + 173 + ant_value_t args[2] = { 174 + js_mkstr(js, "abort", 5), 175 + listener 176 + }; 177 + 178 + pipes_call_method(js, signal, "removeEventListener", args, 2); 179 + js_set(js, state, "abortListener", js_mkundef()); 180 + } 181 + 182 + static void pipes_settle(ant_t *js, ant_value_t state, bool ok, ant_value_t value) { 183 + pipe_state_t *pst = pipe_get_state(state); 184 + if (!pst || pst->settled) return; 185 + 186 + pst->settled = true; 187 + pst->shutting_down = true; 188 + pipes_remove_abort_listener(js, state); 189 + pipes_release_locks(js, state); 190 + 191 + ant_value_t promise = pipe_state_promise(state); 192 + if (ok) js_resolve_promise(js, promise, value); 193 + else js_reject_promise(js, promise, value); 194 + } 195 + 196 + static void pipes_ignore_promise(ant_value_t maybe_promise) { 197 + promise_mark_handled(maybe_promise); 198 + } 199 + 200 + static void pipes_shutdown_from_source_error(ant_t *js, ant_value_t state, ant_value_t error) { 201 + pipe_state_t *pst = pipe_get_state(state); 202 + if (!pst || pst->settled || pst->shutting_down) return; 203 + 204 + pst->shutting_down = true; 205 + pst->in_flight = false; 206 + 207 + if (!pst->prevent_abort) { 208 + ant_value_t result = writable_stream_abort(js, pipe_state_dest(state), error); 209 + pipes_ignore_promise(result); 210 + } 211 + 212 + pipes_settle(js, state, false, error); 213 + } 214 + 215 + static void pipes_shutdown_from_dest_error(ant_t *js, ant_value_t state, ant_value_t error) { 216 + pipe_state_t *pst = pipe_get_state(state); 217 + if (!pst || pst->settled || pst->shutting_down) return; 218 + 219 + pst->shutting_down = true; 220 + pst->in_flight = false; 221 + 222 + if (!pst->prevent_cancel) { 223 + ant_value_t result = readable_stream_cancel(js, pipe_state_source(state), error); 224 + pipes_ignore_promise(result); 225 + } 226 + 227 + pipes_settle(js, state, false, error); 228 + } 229 + 230 + static void pipes_shutdown_from_abort(ant_t *js, ant_value_t state, ant_value_t reason) { 231 + pipe_state_t *pst = pipe_get_state(state); 232 + if (!pst || pst->settled || pst->shutting_down) return; 233 + 234 + pst->shutting_down = true; 235 + pst->in_flight = false; 236 + 237 + if (!pst->prevent_abort) { 238 + ant_value_t result = writable_stream_abort(js, pipe_state_dest(state), reason); 239 + pipes_ignore_promise(result); 240 + } 241 + if (!pst->prevent_cancel) { 242 + ant_value_t result = readable_stream_cancel(js, pipe_state_source(state), reason); 243 + pipes_ignore_promise(result); 244 + } 245 + 246 + pipes_settle(js, state, false, reason); 247 + } 248 + 249 + static void pipes_pump(ant_t *js, ant_value_t state); 250 + 251 + static ant_value_t pipe_write_resolve(ant_t *js, ant_value_t *args, int nargs) { 252 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 253 + pipe_state_t *pst = pipe_get_state(state); 254 + if (!pst) return js_mkundef(); 255 + pst->in_flight = false; 256 + if (pst->settled || pst->shutting_down) 257 + return js_mkundef(); 258 + pipes_pump(js, state); 259 + return js_mkundef(); 260 + } 261 + 262 + static ant_value_t pipe_dest_error(ant_t *js, ant_value_t *args, int nargs) { 263 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 264 + ant_value_t error = (nargs > 0) ? args[0] : js_mkundef(); 265 + pipes_shutdown_from_dest_error(js, state, error); 266 + return js_mkundef(); 267 + } 268 + 269 + static ant_value_t pipe_close_dest_resolve(ant_t *js, ant_value_t *args, int nargs) { 270 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 271 + pipes_settle(js, state, true, js_mkundef()); 272 + return js_mkundef(); 273 + } 274 + 275 + static ant_value_t pipe_close_dest_reject(ant_t *js, ant_value_t *args, int nargs) { 276 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 277 + ant_value_t error = (nargs > 0) ? args[0] : js_mkundef(); 278 + pipes_settle(js, state, false, error); 279 + return js_mkundef(); 280 + } 281 + 282 + static ant_value_t pipe_read_resolve(ant_t *js, ant_value_t *args, int nargs) { 283 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 284 + pipe_state_t *pst = pipe_get_state(state); 285 + if (!pst || pst->settled || pst->shutting_down) { 286 + if (pst) pst->in_flight = false; 287 + return js_mkundef(); 288 + } 289 + 290 + ant_value_t result = (nargs > 0) ? args[0] : js_mkundef(); 291 + bool done = js_truthy(js, js_get(js, result, "done")); 292 + 293 + if (done) { 294 + pst->in_flight = false; 295 + if (pst->prevent_close) { 296 + pipes_settle(js, state, true, js_mkundef()); 297 + return js_mkundef(); 298 + } 299 + 300 + ant_value_t close_promise = writable_stream_close(js, pipe_state_dest(state)); 301 + ant_value_t on_resolve = js_heavy_mkfun(js, pipe_close_dest_resolve, state); 302 + ant_value_t on_reject = js_heavy_mkfun(js, pipe_close_dest_reject, state); 303 + pipes_chain_thenable(js, close_promise, on_resolve, on_reject); 304 + return js_mkundef(); 305 + } 306 + 307 + ant_value_t value = js_get(js, result, "value"); 308 + ant_value_t write_promise = ws_writer_write(js, pipe_state_writer(state), value); 309 + ant_value_t on_resolve = js_heavy_mkfun(js, pipe_write_resolve, state); 310 + ant_value_t on_reject = js_heavy_mkfun(js, pipe_dest_error, state); 311 + pipes_chain_thenable(js, write_promise, on_resolve, on_reject); 312 + return js_mkundef(); 313 + } 314 + 315 + static ant_value_t pipe_source_error(ant_t *js, ant_value_t *args, int nargs) { 316 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 317 + ant_value_t error = (nargs > 0) ? args[0] : js_mkundef(); 318 + pipes_shutdown_from_source_error(js, state, error); 319 + return js_mkundef(); 320 + } 321 + 322 + static ant_value_t pipe_ready_resolve(ant_t *js, ant_value_t *args, int nargs) { 323 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 324 + pipe_state_t *pst = pipe_get_state(state); 325 + if (!pst || pst->settled || pst->shutting_down) { 326 + if (pst) pst->in_flight = false; 327 + return js_mkundef(); 328 + } 329 + 330 + ant_value_t read_promise = rs_default_reader_read(js, pipe_state_reader(state)); 331 + ant_value_t on_resolve = js_heavy_mkfun(js, pipe_read_resolve, state); 332 + ant_value_t on_reject = js_heavy_mkfun(js, pipe_source_error, state); 333 + pipes_chain_thenable(js, read_promise, on_resolve, on_reject); 334 + return js_mkundef(); 335 + } 336 + 337 + static ant_value_t pipe_abort_listener(ant_t *js, ant_value_t *args, int nargs) { 338 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 339 + pipe_state_t *pst = pipe_get_state(state); 340 + if (!pst || pst->settled || pst->shutting_down) 341 + return js_mkundef(); 342 + 343 + ant_value_t signal = js_get(js, state, "signal"); 344 + pipes_shutdown_from_abort(js, state, pipes_abort_reason(js, signal)); 345 + return js_mkundef(); 346 + } 347 + 348 + static void pipes_pump(ant_t *js, ant_value_t state) { 349 + pipe_state_t *pst = pipe_get_state(state); 350 + if (!pst || pst->settled || pst->shutting_down || pst->in_flight) return; 351 + 352 + pst->in_flight = true; 353 + 354 + ant_value_t writer = pipe_state_writer(state); 355 + ant_value_t ready = ws_writer_ready(writer); 356 + ant_value_t on_resolve = js_heavy_mkfun(js, pipe_ready_resolve, state); 357 + ant_value_t on_reject = js_heavy_mkfun(js, pipe_dest_error, state); 358 + pipes_chain_thenable(js, ready, on_resolve, on_reject); 359 + } 360 + 361 + static ant_value_t pipe_create_rejected(ant_t *js, ant_value_t error) { 362 + ant_value_t promise = js_mkpromise(js); 363 + js_reject_promise(js, promise, error); 364 + return promise; 365 + } 366 + 367 + static void pipes_parse_options( 368 + ant_t *js, ant_value_t options, 369 + bool *prevent_close, bool *prevent_abort, bool *prevent_cancel, 370 + ant_value_t *signal 371 + ) { 372 + *prevent_close = false; 373 + *prevent_abort = false; 374 + *prevent_cancel = false; 375 + *signal = js_mkundef(); 376 + 377 + if (!is_object_type(options)) return; 378 + 379 + *prevent_close = js_truthy(js, js_get(js, options, "preventClose")); 380 + *prevent_abort = js_truthy(js, js_get(js, options, "preventAbort")); 381 + *prevent_cancel = js_truthy(js, js_get(js, options, "preventCancel")); 382 + *signal = js_get(js, options, "signal"); 383 + } 384 + 385 + ant_value_t readable_stream_pipe_to( 386 + ant_t *js, ant_value_t source, ant_value_t dest, 387 + bool prevent_close, bool prevent_abort, bool prevent_cancel, 388 + ant_value_t signal 389 + ) { 390 + rs_stream_t *rs = rs_get_stream(source); 391 + ws_stream_t *ws = ws_get_stream(dest); 392 + if (!rs || !ws) { 393 + js_mkerr_typed(js, JS_ERR_TYPE, "pipeTo requires a ReadableStream and WritableStream"); 394 + return pipe_create_rejected(js, js->thrown_value); 395 + } 396 + 397 + if (is_object_type(rs_stream_reader(source))) { 398 + js_mkerr_typed(js, JS_ERR_TYPE, "ReadableStream is already locked"); 399 + return pipe_create_rejected(js, js->thrown_value); 400 + } 401 + if (is_object_type(ws_stream_writer(dest))) { 402 + js_mkerr_typed(js, JS_ERR_TYPE, "WritableStream is already locked"); 403 + return pipe_create_rejected(js, js->thrown_value); 404 + } 405 + 406 + if (!is_undefined(signal) && !pipes_is_abort_signal(js, signal)) { 407 + js_mkerr_typed(js, JS_ERR_TYPE, "pipeTo option 'signal' must be an AbortSignal"); 408 + return pipe_create_rejected(js, js->thrown_value); 409 + } 410 + 411 + ant_value_t reader_args[1] = { source }; 412 + ant_value_t saved = js->new_target; 413 + js->new_target = g_reader_proto; 414 + ant_value_t reader = js_rs_reader_ctor(js, reader_args, 1); 415 + js->new_target = saved; 416 + if (is_err(reader)) return pipe_create_rejected(js, js->thrown_value); 417 + 418 + ant_value_t writer = ws_acquire_writer(js, dest); 419 + if (is_err(writer)) { 420 + pipes_release_reader(js, reader); 421 + return pipe_create_rejected(js, js->thrown_value); 422 + } 423 + 424 + pipe_state_t *pst = calloc(1, sizeof(pipe_state_t)); 425 + if (!pst) return js_mkerr(js, "out of memory"); 426 + pst->prevent_close = prevent_close; 427 + pst->prevent_abort = prevent_abort; 428 + pst->prevent_cancel = prevent_cancel; 429 + 430 + ant_value_t promise = js_mkpromise(js); 431 + ant_value_t state = js_mkobj(js); 432 + js_set_slot(state, SLOT_DATA, ANT_PTR(pst)); 433 + js_set_slot(state, SLOT_ENTRIES, source); 434 + js_set_slot(state, SLOT_CTOR, dest); 435 + js_set_slot(state, SLOT_BUFFER, reader); 436 + js_set_slot(state, SLOT_DEFAULT, writer); 437 + js_set_slot(state, SLOT_RS_PULL, promise); 438 + js_set_finalizer(state, pipe_state_finalize); 439 + 440 + js_set(js, state, "signal", signal); 441 + js_set(js, state, "abortListener", js_mkundef()); 442 + 443 + if (is_object_type(signal) && js_truthy(js, js_get(js, signal, "aborted"))) { 444 + pipes_shutdown_from_abort(js, state, pipes_abort_reason(js, signal)); 445 + return promise; 446 + } 447 + 448 + if (is_object_type(signal)) { 449 + ant_value_t listener = js_heavy_mkfun(js, pipe_abort_listener, state); 450 + ant_value_t options = js_mkobj(js); 451 + js_set(js, options, "once", js_true); 452 + ant_value_t args[3] = { 453 + js_mkstr(js, "abort", 5), 454 + listener, 455 + options 456 + }; 457 + pipes_call_method(js, signal, "addEventListener", args, 3); 458 + js_set(js, state, "abortListener", listener); 459 + } 460 + 461 + pipes_pump(js, state); 462 + return promise; 463 + } 464 + 465 + static ant_value_t js_rs_pipe_to(ant_t *js, ant_value_t *args, int nargs) { 466 + rs_stream_t *stream = rs_get_stream(js->this_val); 467 + if (!stream) { 468 + js_mkerr_typed(js, JS_ERR_TYPE, "Invalid ReadableStream"); 469 + return pipe_create_rejected(js, js->thrown_value); 470 + } 471 + 472 + ant_value_t dest = (nargs > 0) ? args[0] : js_mkundef(); 473 + bool prevent_close, prevent_abort, prevent_cancel; 474 + ant_value_t signal; 475 + pipes_parse_options(js, nargs > 1 ? args[1] : js_mkundef(), 476 + &prevent_close, &prevent_abort, &prevent_cancel, &signal); 477 + return readable_stream_pipe_to(js, js->this_val, dest, 478 + prevent_close, prevent_abort, prevent_cancel, signal); 479 + } 480 + 481 + static ant_value_t js_rs_pipe_through(ant_t *js, ant_value_t *args, int nargs) { 482 + rs_stream_t *stream = rs_get_stream(js->this_val); 483 + if (!stream) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid ReadableStream"); 484 + if (is_object_type(rs_stream_reader(js->this_val))) 485 + return js_mkerr_typed(js, JS_ERR_TYPE, "ReadableStream is already locked"); 486 + if (nargs < 1 || !is_object_type(args[0])) 487 + return js_mkerr_typed(js, JS_ERR_TYPE, "pipeThrough requires a transform object"); 488 + 489 + ant_value_t transform = args[0]; 490 + ant_value_t writable = js_get(js, transform, "writable"); 491 + ant_value_t readable = js_get(js, transform, "readable"); 492 + if (!is_object_type(writable) || !ws_get_stream(writable)) 493 + return js_mkerr_typed(js, JS_ERR_TYPE, "pipeThrough transform.writable must be a WritableStream"); 494 + if (!is_object_type(readable) || !rs_get_stream(readable)) 495 + return js_mkerr_typed(js, JS_ERR_TYPE, "pipeThrough transform.readable must be a ReadableStream"); 496 + if (is_object_type(ws_stream_writer(writable))) 497 + return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStream is already locked"); 498 + 499 + bool prevent_close, prevent_abort, prevent_cancel; 500 + ant_value_t signal; 501 + pipes_parse_options(js, nargs > 1 ? args[1] : js_mkundef(), 502 + &prevent_close, &prevent_abort, &prevent_cancel, &signal); 503 + 504 + ant_value_t pipe_promise = readable_stream_pipe_to(js, js->this_val, writable, 505 + prevent_close, prevent_abort, prevent_cancel, signal); 506 + promise_mark_handled(pipe_promise); 507 + return readable; 508 + } 509 + 510 + typedef struct { 511 + bool pulling; 512 + bool done; 513 + bool canceled1; 514 + bool canceled2; 515 + } tee_state_t; 516 + 517 + static void tee_state_finalize(ant_t *js, ant_object_t *obj) { 518 + if (!obj->extra_slots) return; 519 + ant_extra_slot_t *entries = (ant_extra_slot_t *)obj->extra_slots; 520 + for (uint8_t i = 0; i < obj->extra_count; i++) { 521 + if (entries[i].slot == SLOT_DATA && vtype(entries[i].value) == T_NUM) { 522 + free((tee_state_t *)(uintptr_t)(size_t)js_getnum(entries[i].value)); 523 + return; 524 + }} 525 + } 526 + 527 + static tee_state_t *tee_get_state(ant_value_t state) { 528 + ant_value_t s = js_get_slot(state, SLOT_DATA); 529 + if (vtype(s) != T_NUM) return NULL; 530 + return (tee_state_t *)(uintptr_t)(size_t)js_getnum(s); 531 + } 532 + 533 + static ant_value_t tee_state_reader(ant_value_t state) { 534 + return js_get_slot(state, SLOT_BUFFER); 535 + } 536 + 537 + static void tee_release_reader(ant_t *js, ant_value_t state) { 538 + ant_value_t reader = tee_state_reader(state); 539 + if (!is_object_type(reader)) return; 540 + pipes_release_reader(js, reader); 541 + js_set_slot(state, SLOT_BUFFER, js_mkundef()); 542 + } 543 + 544 + static void tee_resolve_cancel_promises(ant_t *js, ant_value_t state) { 545 + ant_value_t p1 = js_get_slot(state, SLOT_RS_CLOSED); 546 + ant_value_t p2 = js_get_slot(state, SLOT_RS_SIZE); 547 + if (vtype(p1) == T_PROMISE) { 548 + js_resolve_promise(js, p1, js_mkundef()); 549 + js_set_slot(state, SLOT_RS_CLOSED, js_mkundef()); 550 + } 551 + if (vtype(p2) == T_PROMISE) { 552 + js_resolve_promise(js, p2, js_mkundef()); 553 + js_set_slot(state, SLOT_RS_SIZE, js_mkundef()); 554 + } 555 + } 556 + 557 + static void tee_reject_cancel_promises(ant_t *js, ant_value_t state, ant_value_t error) { 558 + ant_value_t p1 = js_get_slot(state, SLOT_RS_CLOSED); 559 + ant_value_t p2 = js_get_slot(state, SLOT_RS_SIZE); 560 + if (vtype(p1) == T_PROMISE) { 561 + js_reject_promise(js, p1, error); 562 + js_set_slot(state, SLOT_RS_CLOSED, js_mkundef()); 563 + } 564 + if (vtype(p2) == T_PROMISE) { 565 + js_reject_promise(js, p2, error); 566 + js_set_slot(state, SLOT_RS_SIZE, js_mkundef()); 567 + } 568 + } 569 + 570 + static void tee_finalize(ant_t *js, ant_value_t state) { 571 + tee_state_t *st = tee_get_state(state); 572 + if (!st || st->done) return; 573 + st->done = true; 574 + tee_release_reader(js, state); 575 + } 576 + 577 + static void tee_close_branch(ant_t *js, ant_value_t branch_stream) { 578 + ant_value_t ctrl = rs_stream_controller(js, branch_stream); 579 + rs_controller_t *c = rs_get_controller(ctrl); 580 + if (!c || c->close_requested) return; 581 + c->close_requested = true; 582 + if (rs_ctrl_queue_len(js, ctrl) == 0) { 583 + rs_default_controller_clear_algorithms(ctrl); 584 + readable_stream_close(js, branch_stream); 585 + } 586 + } 587 + 588 + static void tee_enqueue_branch(ant_t *js, ant_value_t branch_stream, ant_value_t value) { 589 + ant_value_t ctrl = rs_stream_controller(js, branch_stream); 590 + rs_controller_t *c = rs_get_controller(ctrl); 591 + if (!c || !rs_default_controller_can_close_or_enqueue(c, rs_get_stream(branch_stream))) 592 + return; 593 + 594 + ant_value_t r = rs_stream_reader(branch_stream); 595 + if (is_object_type(r) && rs_reader_has_reqs(js, r)) { 596 + rs_fulfill_read_request(js, branch_stream, value, false); 597 + rs_default_controller_call_pull_if_needed(js, ctrl); 598 + return; 599 + } 600 + 601 + double chunk_size = 1; 602 + ant_value_t size_fn = rs_ctrl_size(ctrl); 603 + if (is_callable(size_fn)) { 604 + ant_value_t sa[1] = { value }; 605 + ant_value_t sr = sv_vm_call(js->vm, js, size_fn, js_mkundef(), sa, 1, NULL, false); 606 + if (!is_err(sr)) 607 + chunk_size = vtype(sr) == T_NUM ? js_getnum(sr) : js_to_number(js, sr); 608 + } 609 + 610 + rs_ctrl_queue_push(js, ctrl, value); 611 + if (c->queue_sizes_len >= c->queue_sizes_cap) { 612 + uint32_t nc = c->queue_sizes_cap ? c->queue_sizes_cap * 2 : 8; 613 + double *ns = realloc(c->queue_sizes, nc * sizeof(double)); 614 + if (ns) { c->queue_sizes = ns; c->queue_sizes_cap = nc; } 615 + } 616 + if (c->queue_sizes_len < c->queue_sizes_cap) 617 + c->queue_sizes[c->queue_sizes_len++] = chunk_size; 618 + c->queue_total_size += chunk_size; 619 + rs_default_controller_call_pull_if_needed(js, ctrl); 620 + } 621 + 622 + static void tee_error_branch(ant_t *js, ant_value_t branch_stream, ant_value_t error) { 623 + readable_stream_error(js, branch_stream, error); 624 + } 625 + 626 + static void tee_pull(ant_t *js, ant_value_t state); 627 + 628 + static ant_value_t tee_cancel_both_resolve(ant_t *js, ant_value_t *args, int nargs) { 629 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 630 + tee_resolve_cancel_promises(js, state); 631 + tee_finalize(js, state); 632 + return js_mkundef(); 633 + } 634 + 635 + static ant_value_t tee_cancel_both_reject(ant_t *js, ant_value_t *args, int nargs) { 636 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 637 + ant_value_t error = (nargs > 0) ? args[0] : js_mkundef(); 638 + tee_reject_cancel_promises(js, state, error); 639 + tee_finalize(js, state); 640 + return js_mkundef(); 641 + } 642 + 643 + static ant_value_t tee_read_resolve(ant_t *js, ant_value_t *args, int nargs) { 644 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 645 + tee_state_t *st = tee_get_state(state); 646 + if (!st) return js_mkundef(); 647 + st->pulling = false; 648 + if (st->done) return js_mkundef(); 649 + 650 + ant_value_t result = (nargs > 0) ? args[0] : js_mkundef(); 651 + bool done = js_truthy(js, js_get(js, result, "done")); 652 + ant_value_t branch1 = js_get_slot(state, SLOT_CTOR); 653 + ant_value_t branch2 = js_get_slot(state, SLOT_DEFAULT); 654 + 655 + if (done) { 656 + if (!st->canceled1) tee_close_branch(js, branch1); 657 + if (!st->canceled2) tee_close_branch(js, branch2); 658 + tee_resolve_cancel_promises(js, state); 659 + tee_finalize(js, state); 660 + return js_mkundef(); 661 + } 662 + 663 + ant_value_t value = js_get(js, result, "value"); 664 + if (!st->canceled1) tee_enqueue_branch(js, branch1, value); 665 + if (!st->canceled2) tee_enqueue_branch(js, branch2, value); 666 + return js_mkundef(); 667 + } 668 + 669 + static ant_value_t tee_read_reject(ant_t *js, ant_value_t *args, int nargs) { 670 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 671 + tee_state_t *st = tee_get_state(state); 672 + if (!st) return js_mkundef(); 673 + ant_value_t error = (nargs > 0) ? args[0] : js_mkundef(); 674 + st->pulling = false; 675 + if (st->done) return js_mkundef(); 676 + 677 + ant_value_t branch1 = js_get_slot(state, SLOT_CTOR); 678 + ant_value_t branch2 = js_get_slot(state, SLOT_DEFAULT); 679 + 680 + if (!st->canceled1) tee_error_branch(js, branch1, error); 681 + if (!st->canceled2) tee_error_branch(js, branch2, error); 682 + tee_resolve_cancel_promises(js, state); 683 + tee_finalize(js, state); 684 + 685 + return js_mkundef(); 686 + } 687 + 688 + static void tee_pull(ant_t *js, ant_value_t state) { 689 + tee_state_t *st = tee_get_state(state); 690 + if (!st || st->done || st->pulling) return; 691 + if (st->canceled1 && st->canceled2) return; 692 + 693 + st->pulling = true; 694 + ant_value_t read_promise = rs_default_reader_read(js, tee_state_reader(state)); 695 + ant_value_t on_resolve = js_heavy_mkfun(js, tee_read_resolve, state); 696 + ant_value_t on_reject = js_heavy_mkfun(js, tee_read_reject, state); 697 + pipes_chain_thenable(js, read_promise, on_resolve, on_reject); 698 + } 699 + 700 + static ant_value_t tee_branch_pull(ant_t *js, ant_value_t *args, int nargs) { 701 + ant_value_t state = js_get_slot(js->current_func, SLOT_DATA); 702 + tee_pull(js, state); 703 + 704 + ant_value_t promise = js_mkpromise(js); 705 + js_resolve_promise(js, promise, js_mkundef()); 706 + 707 + return promise; 708 + } 709 + 710 + static ant_value_t tee_branch_cancel(ant_t *js, ant_value_t *args, int nargs) { 711 + ant_value_t wrapper = js_get_slot(js->current_func, SLOT_DATA); 712 + ant_value_t state = js_get_slot(wrapper, SLOT_DATA); 713 + 714 + int branch = (int)js_getnum(js_get_slot(wrapper, SLOT_ENTRIES)); 715 + ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef(); 716 + tee_state_t *st = tee_get_state(state); 717 + if (!st) return js_mkundef(); 718 + 719 + bool is_b1 = (branch == 1); 720 + internal_slot_t reason_slot = is_b1 ? SLOT_RS_PULL : SLOT_RS_CANCEL; 721 + internal_slot_t promise_slot = is_b1 ? SLOT_RS_CLOSED : SLOT_RS_SIZE; 722 + bool already_canceled = is_b1 ? st->canceled1 : st->canceled2; 723 + 724 + if (already_canceled) { 725 + ant_value_t existing = js_get_slot(state, promise_slot); 726 + if (vtype(existing) == T_PROMISE) return existing; 727 + ant_value_t resolved = js_mkpromise(js); 728 + js_resolve_promise(js, resolved, js_mkundef()); 729 + return resolved; 730 + } 731 + 732 + if (is_b1) st->canceled1 = true; 733 + else st->canceled2 = true; 734 + js_set_slot(state, reason_slot, reason); 735 + 736 + ant_value_t promise = js_mkpromise(js); 737 + js_set_slot(state, promise_slot, promise); 738 + 739 + if (st->done) { 740 + js_resolve_promise(js, promise, js_mkundef()); 741 + js_set_slot(state, promise_slot, js_mkundef()); 742 + return promise; 743 + } 744 + 745 + if (st->canceled1 && st->canceled2) { 746 + ant_value_t reasons = js_mkarr(js); 747 + js_arr_push(js, reasons, js_get_slot(state, SLOT_RS_PULL)); 748 + js_arr_push(js, reasons, js_get_slot(state, SLOT_RS_CANCEL)); 749 + 750 + ant_value_t orig_stream = js_get_slot(state, SLOT_ENTRIES); 751 + ant_value_t cancel_promise = readable_stream_cancel(js, orig_stream, reasons); 752 + ant_value_t on_resolve = js_heavy_mkfun(js, tee_cancel_both_resolve, state); 753 + ant_value_t on_reject = js_heavy_mkfun(js, tee_cancel_both_reject, state); 754 + pipes_chain_thenable(js, cancel_promise, on_resolve, on_reject); 755 + } 756 + 757 + return promise; 758 + } 759 + 760 + static ant_value_t js_rs_tee(ant_t *js, ant_value_t *args, int nargs) { 761 + rs_stream_t *stream = rs_get_stream(js->this_val); 762 + if (!stream) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid ReadableStream"); 763 + if (is_object_type(rs_stream_reader(js->this_val))) 764 + return js_mkerr_typed(js, JS_ERR_TYPE, "ReadableStream is already locked"); 765 + 766 + ant_value_t reader_args[1] = { js->this_val }; 767 + ant_value_t saved = js->new_target; 768 + js->new_target = g_reader_proto; 769 + ant_value_t reader = js_rs_reader_ctor(js, reader_args, 1); 770 + js->new_target = saved; 771 + if (is_err(reader)) return reader; 772 + 773 + tee_state_t *st = calloc(1, sizeof(tee_state_t)); 774 + if (!st) return js_mkerr(js, "out of memory"); 775 + 776 + ant_value_t state = js_mkobj(js); 777 + js_set_slot(state, SLOT_DATA, ANT_PTR(st)); 778 + js_set_slot(state, SLOT_ENTRIES, js->this_val); 779 + js_set_slot(state, SLOT_BUFFER, reader); 780 + js_set_slot(state, SLOT_RS_PULL, js_mkundef()); 781 + js_set_slot(state, SLOT_RS_CANCEL, js_mkundef()); 782 + js_set_slot(state, SLOT_RS_CLOSED, js_mkundef()); 783 + js_set_slot(state, SLOT_RS_SIZE, js_mkundef()); 784 + js_set_finalizer(state, tee_state_finalize); 785 + 786 + ant_value_t pull1 = js_heavy_mkfun(js, tee_branch_pull, state); 787 + ant_value_t pull2 = js_heavy_mkfun(js, tee_branch_pull, state); 788 + 789 + ant_value_t cancel1_wrap = js_mkobj(js); 790 + js_set_slot(cancel1_wrap, SLOT_DATA, state); 791 + js_set_slot(cancel1_wrap, SLOT_ENTRIES, js_mknum(1)); 792 + ant_value_t cancel1 = js_heavy_mkfun(js, tee_branch_cancel, cancel1_wrap); 793 + 794 + ant_value_t cancel2_wrap = js_mkobj(js); 795 + js_set_slot(cancel2_wrap, SLOT_DATA, state); 796 + js_set_slot(cancel2_wrap, SLOT_ENTRIES, js_mknum(2)); 797 + ant_value_t cancel2 = js_heavy_mkfun(js, tee_branch_cancel, cancel2_wrap); 798 + 799 + ant_value_t branch1 = rs_create_stream(js, pull1, cancel1, 1); 800 + ant_value_t branch2 = rs_create_stream(js, pull2, cancel2, 1); 801 + if (is_err(branch1) || is_err(branch2)) { 802 + tee_release_reader(js, state); 803 + return is_err(branch1) ? branch1 : branch2; 804 + } 805 + 806 + js_set_slot(state, SLOT_CTOR, branch1); 807 + js_set_slot(state, SLOT_DEFAULT, branch2); 808 + 809 + ant_value_t result = js_mkarr(js); 810 + js_arr_push(js, result, branch1); 811 + js_arr_push(js, result, branch2); 812 + 813 + return result; 814 + } 815 + 816 + void init_pipes_proto(ant_t *js, ant_value_t rs_proto) { 817 + js_set(js, rs_proto, "pipeTo", js_mkfun(js_rs_pipe_to)); 818 + js_set_descriptor(js, rs_proto, "pipeTo", 6, JS_DESC_W | JS_DESC_C); 819 + js_set(js, rs_proto, "pipeThrough", js_mkfun(js_rs_pipe_through)); 820 + js_set_descriptor(js, rs_proto, "pipeThrough", 11, JS_DESC_W | JS_DESC_C); 821 + js_set(js, rs_proto, "tee", js_mkfun(js_rs_tee)); 822 + js_set_descriptor(js, rs_proto, "tee", 3, JS_DESC_W | JS_DESC_C); 823 + }
+1 -1
src/streams/writable.c
··· 124 124 return js_get_slot(writer_obj, SLOT_RS_CLOSED); 125 125 } 126 126 127 - static inline ant_value_t ws_writer_ready(ant_value_t writer_obj) { 127 + ant_value_t ws_writer_ready(ant_value_t writer_obj) { 128 128 return js_get_slot(writer_obj, SLOT_WS_READY); 129 129 } 130 130