MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at mir/inline-method 1929 lines 74 kB view raw
1#include <stdlib.h> 2#include <string.h> 3 4#include "ant.h" 5#include "ptr.h" 6#include "internal.h" 7#include "silver/engine.h" 8#include "esm/loader.h" 9#include "gc/roots.h" 10 11#include "modules/assert.h" 12#include "modules/buffer.h" 13#include "modules/events.h" 14#include "modules/stream.h" 15#include "modules/symbol.h" 16#include "modules/string_decoder.h" 17 18enum { STREAM_NATIVE_TAG = 0x5354524Du }; // STRM 19 20static ant_value_t g_stream_proto = 0; 21static ant_value_t g_stream_ctor = 0; 22 23static ant_value_t g_readable_proto = 0; 24static ant_value_t g_readable_ctor = 0; 25 26static ant_value_t g_writable_proto = 0; 27static ant_value_t g_writable_ctor = 0; 28 29static ant_value_t g_duplex_proto = 0; 30static ant_value_t g_duplex_ctor = 0; 31 32static ant_value_t g_transform_proto = 0; 33static ant_value_t g_transform_ctor = 0; 34 35static ant_value_t g_passthrough_proto = 0; 36static ant_value_t g_passthrough_ctor = 0; 37 38static double g_default_high_water_mark = 16384.0; 39static double g_default_object_high_water_mark = 16.0; 40 41static ant_value_t stream_noop(ant_t *js, ant_value_t *args, int nargs) { 42 return js_mkundef(); 43} 44 45static bool stream_is_instance(ant_value_t value) { 46 return is_object_type(value) && js_check_native_tag(value, STREAM_NATIVE_TAG); 47} 48 49static inline void stream_set_end_callback(ant_t *js, ant_value_t stream_obj, ant_value_t callback) { 50 js_set_slot_wb(js, stream_obj, SLOT_AUX, callback); 51} 52 53static stream_private_state_t *stream_private_state(ant_value_t stream_obj) { 54 if (!stream_is_instance(stream_obj)) return NULL; 55 return (stream_private_state_t *)js_get_native_ptr(stream_obj); 56} 57 58static ant_value_t stream_require_this(ant_t *js, ant_value_t value, const char *label) { 59 if (!stream_is_instance(value)) 60 return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid %s", label); 61 return value; 62} 63 64static ant_value_t stream_truthy_or_object(ant_t *js, ant_value_t value) { 65 return js_truthy(js, value) ? value : js_mkobj(js); 66} 67 68static ant_value_t stream_readable_state(ant_t *js, ant_value_t stream_obj) { 69 return js_get(js, stream_obj, "_readableState"); 70} 71 72static ant_value_t stream_writable_state(ant_t *js, ant_value_t stream_obj) { 73 return js_get(js, stream_obj, "_writableState"); 74} 75 76static ant_value_t stream_pipes(ant_t *js, ant_value_t stream_obj) { 77 return js_get(js, stream_obj, "_pipes"); 78} 79 80static bool stream_key_is_cstr(ant_t *js, ant_value_t value, const char *expected) { 81 size_t len = 0; 82 const char *s = NULL; 83 if (vtype(value) != T_STR) return false; 84 s = js_getstr(js, value, &len); 85 return s && len == strlen(expected) && memcmp(s, expected, len) == 0; 86} 87 88static ant_value_t stream_event_key(ant_t *js, ant_value_t value) { 89 uint8_t t = vtype(value); 90 if (t == T_STR || t == T_SYMBOL) return value; 91 return js_mkerr(js, "event must be a string or Symbol"); 92} 93 94void *stream_get_attached_state(ant_value_t stream_obj) { 95 stream_private_state_t *priv = stream_private_state(stream_obj); 96 return priv ? priv->attached_state : NULL; 97} 98 99void stream_set_attached_state( 100 ant_value_t stream_obj, 101 void *state, 102 stream_finalize_fn finalize 103) { 104 stream_private_state_t *priv = stream_private_state(stream_obj); 105 if (!priv) return; 106 priv->attached_state = state; 107 priv->attached_state_finalize = finalize; 108} 109 110void stream_clear_attached_state(ant_value_t stream_obj) { 111 stream_private_state_t *priv = stream_private_state(stream_obj); 112 if (!priv) return; 113 priv->attached_state = NULL; 114 priv->attached_state_finalize = NULL; 115} 116 117static void stream_finalize(ant_t *js, ant_object_t *obj) { 118 ant_value_t stream_obj = js_obj_from_ptr(obj); 119 stream_private_state_t *priv = stream_private_state(stream_obj); 120 if (!priv) return; 121 js_set_native_ptr(stream_obj, NULL); 122 if (priv->attached_state && priv->attached_state_finalize) 123 priv->attached_state_finalize(js, stream_obj, priv->attached_state); 124 free(priv); 125} 126 127static ant_value_t stream_call( 128 ant_t *js, 129 ant_value_t fn, 130 ant_value_t this_val, 131 ant_value_t *args, 132 int nargs, 133 bool is_ctor 134) { 135 if (!is_callable(fn)) return js_mkundef(); 136 if (sv_check_c_stack_overflow(js)) 137 return js_mkerr_typed(js, JS_ERR_RANGE | JS_ERR_NO_STACK, "Maximum call stack size exceeded"); 138 139 sv_call_mode_t mode = is_ctor ? SV_CALL_MODE_CONSTRUCT : SV_CALL_MODE_NORMAL; 140 sv_call_plan_t plan; 141 ant_value_t err = sv_prepare_call(js->vm, js, fn, this_val, args, nargs, NULL, mode, &plan); 142 if (is_err(err)) return err; 143 144 return sv_execute_call_plan(js->vm, js, &plan, NULL); 145} 146 147static ant_value_t stream_call_prop( 148 ant_t *js, 149 ant_value_t target, 150 const char *name, 151 ant_value_t *args, 152 int nargs 153) { 154 ant_value_t fn = js_getprop_fallback(js, target, name); 155 if (is_err(fn) || !is_callable(fn)) return js_mkundef(); 156 return stream_call(js, fn, target, args, nargs, false); 157} 158 159static void stream_call_callback(ant_t *js, ant_value_t fn, ant_value_t *args, int nargs) { 160 if (!is_callable(fn)) return; 161 stream_call(js, fn, js_mkundef(), args, nargs, false); 162} 163 164static void stream_schedule_microtask(ant_t *js, ant_cfunc_t fn, ant_value_t data) { 165 ant_value_t promise = js_mkpromise(js); 166 ant_value_t cb = js_heavy_mkfun(js, fn, data); 167 ant_value_t then_result = 0; 168 169 js_resolve_promise(js, promise, js_mkundef()); 170 then_result = js_promise_then(js, promise, cb, js_mkundef()); 171 promise_mark_handled(then_result); 172} 173 174static ant_value_t stream_buffer_ctor(ant_t *js) { 175 ant_value_t ns = js_esm_import_sync_cstr(js, "buffer", 6); 176 if (is_err(ns)) return ns; 177 return js_get(js, ns, "Buffer"); 178} 179 180static ant_value_t stream_readable_decoder(ant_t *js, ant_value_t stream_obj) { 181 ant_value_t state = stream_readable_state(js, stream_obj); 182 if (!is_object_type(state)) return js_mkundef(); 183 return js_get(js, state, "decoder"); 184} 185 186static ant_value_t stream_readable_decode_chunk( 187 ant_t *js, ant_value_t stream_obj, 188 ant_value_t chunk, bool flush 189) { 190 ant_value_t decoder = stream_readable_decoder(js, stream_obj); 191 if (!is_object_type(decoder)) return chunk; 192 return string_decoder_decode_value(js, decoder, chunk, flush); 193} 194 195static bool stream_value_is_empty_string(ant_t *js, ant_value_t value) { 196 size_t len = 0; 197 if (vtype(value) != T_STR) return false; 198 (void)js_getstr(js, value, &len); 199 return len == 0; 200} 201 202static ant_value_t stream_make_buffer(ant_t *js, ant_value_t value, ant_value_t encoding) { 203 ant_value_t buffer_ctor = stream_buffer_ctor(js); 204 ant_value_t from_fn = 0; 205 ant_value_t args[2]; 206 207 if (is_err(buffer_ctor)) return buffer_ctor; 208 from_fn = js_get(js, buffer_ctor, "from"); 209 if (is_err(from_fn) || !is_callable(from_fn)) 210 return js_mkerr(js, "Buffer.from is not available"); 211 212 args[0] = value; 213 args[1] = encoding; 214 return stream_call(js, from_fn, buffer_ctor, args, 2, false); 215} 216 217static ant_value_t stream_normalize_chunk( 218 ant_t *js, 219 ant_value_t chunk, 220 bool object_mode, 221 ant_value_t encoding 222) { 223 ant_value_t str_val = 0; 224 225 if ( 226 object_mode || is_null(chunk) || is_undefined(chunk) || 227 vtype(chunk) == T_TYPEDARRAY || buffer_is_binary_source(chunk) 228 ) return chunk; 229 230 if (vtype(chunk) == T_STR) return stream_make_buffer(js, chunk, encoding); 231 232 str_val = js_tostring_val(js, chunk); 233 if (is_err(str_val)) return str_val; 234 235 return stream_make_buffer(js, str_val, encoding); 236} 237 238static ant_value_t stream_readable_buffer(ant_t *js, ant_value_t stream_obj) { 239 ant_value_t state = stream_readable_state(js, stream_obj); 240 if (!is_object_type(state)) return js_mkundef(); 241 return js_get(js, state, "buffer"); 242} 243 244static ant_offset_t stream_readable_buffer_head(ant_t *js, ant_value_t stream_obj) { 245 ant_value_t state = stream_readable_state(js, stream_obj); 246 ant_value_t head = is_object_type(state) ? js_get(js, state, "bufferHead") : js_mkundef(); 247 return vtype(head) == T_NUM ? (ant_offset_t)js_getnum(head) : 0; 248} 249 250static void stream_set_readable_buffer_head(ant_t *js, ant_value_t stream_obj, ant_offset_t head) { 251 ant_value_t state = stream_readable_state(js, stream_obj); 252 if (is_object_type(state)) js_set(js, state, "bufferHead", js_mknum((double)head)); 253} 254 255static ant_offset_t stream_readable_buffer_len(ant_t *js, ant_value_t stream_obj) { 256 ant_value_t buffer = stream_readable_buffer(js, stream_obj); 257 ant_offset_t head = stream_readable_buffer_head(js, stream_obj); 258 ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0; 259 return len > head ? len - head : 0; 260} 261 262static void stream_compact_readable_buffer(ant_t *js, ant_value_t stream_obj) { 263 ant_value_t state = stream_readable_state(js, stream_obj); 264 ant_value_t buffer = stream_readable_buffer(js, stream_obj); 265 ant_offset_t head = stream_readable_buffer_head(js, stream_obj); 266 ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0; 267 ant_value_t compact = 0; 268 269 if (!is_object_type(state) || vtype(buffer) != T_ARR) return; 270 if (head == 0) return; 271 272 if (head >= len) { 273 compact = js_mkarr(js); 274 js_set(js, state, "buffer", compact); 275 js_set(js, state, "bufferHead", js_mknum(0)); 276 return; 277 } 278 279 if (head <= 32 && head * 2 < len) return; 280 compact = js_mkarr(js); 281 for (ant_offset_t i = head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, buffer, i)); 282 283 js_set(js, state, "buffer", compact); 284 js_set(js, state, "bufferHead", js_mknum(0)); 285} 286 287static void stream_buffer_push(ant_t *js, ant_value_t stream_obj, ant_value_t value) { 288 ant_value_t state = stream_readable_state(js, stream_obj); 289 ant_value_t buffer = stream_readable_buffer(js, stream_obj); 290 291 if (!is_object_type(state) || vtype(buffer) != T_ARR) return; 292 js_arr_push(js, buffer, value); 293} 294 295static ant_value_t stream_buffer_shift(ant_t *js, ant_value_t stream_obj) { 296 ant_value_t buffer = stream_readable_buffer(js, stream_obj); 297 ant_offset_t head = stream_readable_buffer_head(js, stream_obj); 298 ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0; 299 ant_value_t value = js_mkundef(); 300 301 if (vtype(buffer) != T_ARR || head >= len) return js_mkundef(); 302 value = js_arr_get(js, buffer, head); 303 stream_set_readable_buffer_head(js, stream_obj, head + 1); 304 stream_compact_readable_buffer(js, stream_obj); 305 return value; 306} 307 308static bool stream_listener_count_positive(ant_t *js, ant_value_t target, const char *event_name) { 309 ant_value_t args[1]; 310 ant_value_t result = 0; 311 312 args[0] = js_mkstr(js, event_name, strlen(event_name)); 313 result = stream_call_prop(js, target, "listenerCount", args, 1); 314 return vtype(result) == T_NUM && js_getnum(result) > 0; 315} 316 317static void stream_remove_listener( 318 ant_t *js, 319 ant_value_t target, 320 const char *event_name, 321 ant_value_t listener 322) { 323 ant_value_t args[2]; 324 args[0] = js_mkstr(js, event_name, strlen(event_name)); 325 args[1] = listener; 326 stream_call_prop(js, target, "removeListener", args, 2); 327} 328 329static ant_value_t stream_get_option(ant_t *js, ant_value_t options, const char *name) { 330 if (!is_object_type(options)) return js_mkundef(); 331 return js_get(js, options, name); 332} 333 334static double stream_default_high_water_mark(bool object_mode) { 335 return object_mode ? g_default_object_high_water_mark : g_default_high_water_mark; 336} 337 338static double stream_high_water_mark_from_options(ant_t *js, ant_value_t options, bool object_mode) { 339 ant_value_t hwm = stream_get_option(js, options, "highWaterMark"); 340 return (vtype(hwm) == T_NUM && js_getnum(hwm) > 0) 341 ? js_getnum(hwm) 342 : stream_default_high_water_mark(object_mode); 343} 344 345static ant_value_t stream_make_base_object(ant_t *js, ant_value_t proto) { 346 ant_value_t obj = js_mkobj(js); 347 stream_private_state_t *priv = calloc(1, sizeof(*priv)); 348 349 if (is_object_type(proto)) js_set_proto_init(obj, proto); 350 js_set_native_tag(obj, STREAM_NATIVE_TAG); 351 352 if (priv) js_set_native_ptr(obj, priv); 353 js_set_slot(obj, SLOT_AUX, js_mkundef()); 354 js_set_finalizer(obj, stream_finalize); 355 356 return obj; 357} 358 359static void stream_init_base(ant_t *js, ant_value_t obj, ant_value_t raw_options) { 360 ant_value_t pipes = js_mkarr(js); 361 js_set(js, obj, "readable", js_true); 362 js_set(js, obj, "writable", js_true); 363 js_set(js, obj, "destroyed", js_false); 364 js_set(js, obj, "_paused", js_false); 365 js_set(js, obj, "_pipes", pipes); 366 js_set(js, obj, "_streamOptions", stream_truthy_or_object(js, raw_options)); 367} 368 369static void stream_init_readable(ant_t *js, ant_value_t obj, ant_value_t raw_options) { 370 ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js); 371 ant_value_t state = js_mkobj(js); 372 ant_value_t read_fn = stream_get_option(js, options, "read"); 373 374 bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode")); 375 double high_water_mark = stream_high_water_mark_from_options(js, options, object_mode); 376 377 stream_init_base(js, obj, raw_options); 378 js_set(js, obj, "readable", js_true); 379 js_set(js, obj, "writable", js_false); 380 js_set(js, obj, "readableEnded", js_false); 381 382 js_set(js, state, "objectMode", js_bool(object_mode)); 383 js_set(js, state, "ended", js_false); 384 js_set(js, state, "endEmitted", js_false); 385 js_set(js, state, "flowing", js_false); 386 js_set(js, state, "flowingReadScheduled", js_false); 387 js_set(js, state, "reading", js_false); 388 js_set(js, state, "highWaterMark", js_mknum(high_water_mark)); 389 js_set(js, state, "buffer", js_mkarr(js)); 390 js_set(js, state, "bufferHead", js_mknum(0)); 391 js_set(js, obj, "_readableState", state); 392 393 if (is_callable(read_fn)) js_set(js, obj, "_read", read_fn); 394} 395 396static void stream_init_writable(ant_t *js, ant_value_t obj, ant_value_t raw_options) { 397 ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js); 398 ant_value_t state = js_mkobj(js); 399 bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode")) 400 || js_truthy(js, stream_get_option(js, options, "writableObjectMode")); 401 ant_value_t write_fn = stream_get_option(js, options, "write"); 402 ant_value_t final_fn = stream_get_option(js, options, "final"); 403 ant_value_t destroy_fn = stream_get_option(js, options, "destroy"); 404 405 stream_init_base(js, obj, raw_options); 406 js_set(js, obj, "readable", js_false); 407 js_set(js, obj, "writable", js_true); 408 js_set(js, obj, "writableEnded", js_false); 409 js_set(js, obj, "writableFinished", js_false); 410 411 js_set(js, state, "objectMode", js_bool(object_mode)); 412 js_set(js, state, "finished", js_false); 413 js_set(js, state, "ended", js_false); 414 js_set(js, obj, "_writableState", state); 415 416 if (is_callable(write_fn)) js_set(js, obj, "_write", write_fn); 417 if (is_callable(final_fn)) js_set(js, obj, "_final", final_fn); 418 if (is_callable(destroy_fn)) js_set(js, obj, "_destroy", destroy_fn); 419} 420 421static ant_value_t stream_construct( 422 ant_t *js, 423 ant_value_t base_proto, 424 ant_value_t raw_options, 425 void (*init_fn)(ant_t *, ant_value_t, ant_value_t) 426) { 427 ant_value_t proto = js_instance_proto_from_new_target(js, base_proto); 428 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : base_proto); 429 init_fn(js, obj, raw_options); 430 return obj; 431} 432 433static ant_value_t stream_emit_named(ant_t *js, ant_value_t stream_obj, const char *event_name) { 434 return js_bool(eventemitter_emit_args(js, stream_obj, event_name, NULL, 0)); 435} 436 437static void stream_emit_error(ant_t *js, ant_value_t stream_obj, ant_value_t error) { 438 ant_value_t args[1]; 439 args[0] = error; 440 eventemitter_emit_args(js, stream_obj, "error", args, 1); 441} 442 443static void stream_readable_schedule_continue_flowing(ant_t *js, ant_value_t stream_obj) { 444 ant_value_t state = stream_readable_state(js, stream_obj); 445 446 if (!is_object_type(state)) return; 447 if (!js_truthy(js, js_get(js, state, "flowing"))) return; 448 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return; 449 if (js_truthy(js, js_get(js, state, "ended"))) return; 450 if (stream_readable_buffer_len(js, stream_obj) > 0) return; 451 if (js_truthy(js, js_get(js, state, "flowingReadScheduled"))) return; 452 453 js_set(js, state, "flowingReadScheduled", js_true); 454 stream_schedule_microtask(js, stream_readable_continue_flowing, stream_obj); 455} 456 457static ant_value_t js_stream_pause(ant_t *js, ant_value_t *args, int nargs) { 458 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); 459 if (is_err(stream_obj)) return stream_obj; 460 461 js_set(js, stream_obj, "_paused", js_true); 462 stream_emit_named(js, stream_obj, "pause"); 463 return stream_obj; 464} 465 466static ant_value_t js_stream_resume(ant_t *js, ant_value_t *args, int nargs) { 467 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); 468 if (is_err(stream_obj)) return stream_obj; 469 470 js_set(js, stream_obj, "_paused", js_false); 471 stream_emit_named(js, stream_obj, "resume"); 472 return stream_obj; 473} 474 475static ant_value_t js_stream_is_paused(ant_t *js, ant_value_t *args, int nargs) { 476 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); 477 ant_value_t paused = 0; 478 if (is_err(stream_obj)) return stream_obj; 479 paused = js_get(js, stream_obj, "_paused"); 480 return js_bool(js_truthy(js, paused)); 481} 482 483static void stream_pipe_remove_state(ant_t *js, ant_value_t source, ant_value_t state_obj) { 484 ant_value_t pipes = stream_pipes(js, source); 485 ant_offset_t len = vtype(pipes) == T_ARR ? js_arr_len(js, pipes) : 0; 486 ant_value_t next = js_mkarr(js); 487 488 for (ant_offset_t i = 0; i < len; i++) { 489 ant_value_t item = js_arr_get(js, pipes, i); 490 if (item != state_obj) js_arr_push(js, next, item); 491 } 492 493 js_set(js, source, "_pipes", next); 494} 495 496static void stream_pipe_cleanup(ant_t *js, ant_value_t state_obj) { 497 ant_value_t cleaned = js_get(js, state_obj, "cleaned"); 498 ant_value_t source = js_get(js, state_obj, "source"); 499 ant_value_t dest = js_get(js, state_obj, "dest"); 500 ant_value_t on_data = js_get(js, state_obj, "onData"); 501 ant_value_t on_drain = js_get(js, state_obj, "onDrain"); 502 ant_value_t on_end = js_get(js, state_obj, "onEnd"); 503 ant_value_t on_close = js_get(js, state_obj, "onClose"); 504 ant_value_t on_error = js_get(js, state_obj, "onError"); 505 506 if (js_truthy(js, cleaned)) return; 507 js_set(js, state_obj, "cleaned", js_true); 508 509 if (stream_is_instance(source)) { 510 stream_remove_listener(js, source, "data", on_data); 511 stream_remove_listener(js, source, "end", on_end); 512 stream_remove_listener(js, source, "close", on_close); 513 stream_remove_listener(js, source, "error", on_error); 514 stream_pipe_remove_state(js, source, state_obj); 515 } 516 517 if (is_object_type(dest)) 518 stream_remove_listener(js, dest, "drain", on_drain); 519} 520 521static ant_value_t stream_pipe_on_data(ant_t *js, ant_value_t *args, int nargs) { 522 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 523 ant_value_t source = js_get(js, state_obj, "source"); 524 ant_value_t dest = js_get(js, state_obj, "dest"); 525 ant_value_t result = js_mkundef(); 526 527 if (!is_object_type(dest)) return js_mkundef(); 528 result = stream_call_prop(js, dest, "write", nargs > 0 ? &args[0] : NULL, nargs > 0 ? 1 : 0); 529 if (is_err(result)) return result; 530 531 if (result == js_false) stream_call_prop(js, source, "pause", NULL, 0); 532 return js_mkundef(); 533} 534 535static ant_value_t stream_pipe_on_drain(ant_t *js, ant_value_t *args, int nargs) { 536 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 537 ant_value_t source = js_get(js, state_obj, "source"); 538 stream_call_prop(js, source, "resume", NULL, 0); 539 return js_mkundef(); 540} 541 542static ant_value_t stream_pipe_on_end(ant_t *js, ant_value_t *args, int nargs) { 543 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 544 ant_value_t dest = js_get(js, state_obj, "dest"); 545 bool end_dest = js_truthy(js, js_get(js, state_obj, "end")); 546 stream_pipe_cleanup(js, state_obj); 547 if (end_dest) stream_call_prop(js, dest, "end", NULL, 0); 548 return js_mkundef(); 549} 550 551static ant_value_t stream_pipe_on_close(ant_t *js, ant_value_t *args, int nargs) { 552 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 553 stream_pipe_cleanup(js, state_obj); 554 return js_mkundef(); 555} 556 557static ant_value_t stream_pipe_on_error(ant_t *js, ant_value_t *args, int nargs) { 558 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 559 ant_value_t dest = js_get(js, state_obj, "dest"); 560 stream_pipe_cleanup(js, state_obj); 561 if (is_object_type(dest) && stream_listener_count_positive(js, dest, "error") && nargs > 0) 562 eventemitter_emit_args(js, dest, "error", &args[0], 1); 563 return js_mkundef(); 564} 565 566static ant_value_t js_stream_pipe(ant_t *js, ant_value_t *args, int nargs) { 567 ant_value_t source = stream_require_this(js, js_getthis(js), "stream"); 568 ant_value_t options = nargs > 1 ? args[1] : js_mkundef(); 569 ant_value_t state_obj = 0; 570 ant_value_t readable_state = 0; 571 bool end_dest = true; 572 573 if (is_err(source)) return source; 574 if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "pipe requires a destination stream"); 575 if (is_object_type(options)) { 576 ant_value_t end_val = js_get(js, options, "end"); 577 if (!is_undefined(end_val)) end_dest = end_val != js_false; 578 } 579 580 state_obj = js_mkobj(js); 581 js_set(js, state_obj, "source", source); 582 js_set(js, state_obj, "dest", args[0]); 583 js_set(js, state_obj, "end", js_bool(end_dest)); 584 js_set(js, state_obj, "cleaned", js_false); 585 js_set(js, state_obj, "onData", js_heavy_mkfun(js, stream_pipe_on_data, state_obj)); 586 js_set(js, state_obj, "onDrain", js_heavy_mkfun(js, stream_pipe_on_drain, state_obj)); 587 js_set(js, state_obj, "onEnd", js_heavy_mkfun(js, stream_pipe_on_end, state_obj)); 588 js_set(js, state_obj, "onClose", js_heavy_mkfun(js, stream_pipe_on_close, state_obj)); 589 js_set(js, state_obj, "onError", js_heavy_mkfun(js, stream_pipe_on_error, state_obj)); 590 591 js_arr_push(js, stream_pipes(js, source), state_obj); 592 eventemitter_add_listener(js, source, "data", js_get(js, state_obj, "onData"), false); 593 eventemitter_add_listener(js, source, "end", js_get(js, state_obj, "onEnd"), true); 594 eventemitter_add_listener(js, source, "close", js_get(js, state_obj, "onClose"), true); 595 eventemitter_add_listener(js, source, "error", js_get(js, state_obj, "onError"), false); 596 eventemitter_add_listener(js, args[0], "drain", js_get(js, state_obj, "onDrain"), false); 597 eventemitter_emit_args(js, args[0], "pipe", &source, 1); 598 readable_state = stream_readable_state(js, source); 599 if (is_object_type(readable_state)) js_set(js, readable_state, "flowing", js_true); 600 stream_call_prop(js, source, "resume", NULL, 0); 601 602 return args[0]; 603} 604 605static ant_value_t js_stream_unpipe(ant_t *js, ant_value_t *args, int nargs) { 606 ant_value_t source = stream_require_this(js, js_getthis(js), "stream"); 607 ant_value_t pipes = 0; 608 ant_value_t matches = 0; 609 ant_offset_t len = 0; 610 ant_value_t dest = nargs > 0 ? args[0] : js_mkundef(); 611 612 if (is_err(source)) return source; 613 pipes = stream_pipes(js, source); 614 if (vtype(pipes) != T_ARR) return source; 615 616 matches = js_mkarr(js); 617 len = js_arr_len(js, pipes); 618 for (ant_offset_t i = 0; i < len; i++) { 619 ant_value_t state_obj = js_arr_get(js, pipes, i); 620 ant_value_t entry_dest = js_get(js, state_obj, "dest"); 621 if (!is_object_type(dest) || entry_dest == dest) js_arr_push(js, matches, state_obj); 622 } 623 624 len = js_arr_len(js, matches); 625 for (ant_offset_t i = 0; i < len; i++) stream_pipe_cleanup(js, js_arr_get(js, matches, i)); 626 return source; 627} 628 629static ant_value_t stream_destroy_done(ant_t *js, ant_value_t *args, int nargs) { 630 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 631 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 632 ant_value_t destroyed_err = (nargs > 0) ? args[0] : js_mkundef(); 633 if (!is_null(destroyed_err) && !is_undefined(destroyed_err)) stream_emit_error(js, stream_obj, destroyed_err); 634 stream_emit_named(js, stream_obj, "close"); 635 return js_mkundef(); 636} 637 638static ant_value_t stream_once_call(ant_t *js, ant_value_t *args, int nargs) { 639 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 640 ant_value_t fn = js_get(js, state_obj, "fn"); 641 ant_value_t this_val = js_get(js, state_obj, "thisVal"); 642 ant_value_t called = js_get(js, state_obj, "called"); 643 644 if (js_truthy(js, called)) return js_mkundef(); 645 js_set(js, state_obj, "called", js_true); 646 return stream_call(js, fn, this_val, args, nargs, false); 647} 648 649static ant_value_t stream_make_once(ant_t *js, ant_value_t fn, ant_value_t this_val) { 650 ant_value_t state_obj = js_mkobj(js); 651 js_set(js, state_obj, "fn", fn); 652 js_set(js, state_obj, "thisVal", this_val); 653 js_set(js, state_obj, "called", js_false); 654 return js_heavy_mkfun(js, stream_once_call, state_obj); 655} 656 657static ant_value_t js_stream_destroy(ant_t *js, ant_value_t *args, int nargs) { 658 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); 659 ant_value_t destroy_fn = 0; 660 ant_value_t done_state = 0; 661 ant_value_t done = 0; 662 ant_value_t destroy_args[2]; 663 ant_value_t error = nargs > 0 ? args[0] : js_mkundef(); 664 ant_value_t result = 0; 665 666 if (is_err(stream_obj)) return stream_obj; 667 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return stream_obj; 668 669 js_set(js, stream_obj, "destroyed", js_true); 670 671 done_state = js_mkobj(js); 672 js_set(js, done_state, "stream", stream_obj); 673 done = stream_make_once(js, js_heavy_mkfun(js, stream_destroy_done, done_state), js_mkundef()); 674 destroy_fn = js_getprop_fallback(js, stream_obj, "_destroy"); 675 676 if (is_callable(destroy_fn)) { 677 destroy_args[0] = is_undefined(error) ? js_mknull() : error; 678 destroy_args[1] = done; 679 result = stream_call(js, destroy_fn, stream_obj, destroy_args, 2, false); 680 return is_err(result) ? result : stream_obj; 681 } 682 683 destroy_args[0] = is_undefined(error) ? js_mknull() : error; 684 stream_call_callback(js, done, destroy_args, 1); 685 return stream_obj; 686} 687 688static ant_value_t js_readable__read(ant_t *js, ant_value_t *args, int nargs) { 689 return js_mkundef(); 690} 691 692static ant_value_t stream_readable_start_flowing(ant_t *js, ant_value_t *args, int nargs) { 693 ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 694 return stream_readable_begin_flowing(js, stream_obj); 695} 696 697ant_value_t stream_readable_continue_flowing(ant_t *js, ant_value_t *args, int nargs) { 698 ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 699 ant_value_t state = stream_readable_state(js, stream_obj); 700 701 if (!is_object_type(state)) return js_mkundef(); 702 js_set(js, state, "flowingReadScheduled", js_false); 703 704 if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef(); 705 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef(); 706 if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef(); 707 708 stream_readable_maybe_read(js, stream_obj); 709 stream_readable_flush(js, stream_obj); 710 711 return js_mkundef(); 712} 713 714ant_value_t stream_readable_begin_flowing(ant_t *js, ant_value_t stream_obj) { 715 ant_value_t state = stream_readable_state(js, stream_obj); 716 717 if (!is_object_type(state)) return js_mkundef(); 718 if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef(); 719 720 { 721 ant_value_t saved_this = js->this_val; 722 js->this_val = stream_obj; 723 js_stream_resume(js, NULL, 0); 724 js->this_val = saved_this; 725 } 726 727 stream_readable_maybe_read(js, stream_obj); 728 stream_readable_flush(js, stream_obj); 729 730 return js_mkundef(); 731} 732 733ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj) { 734 ant_value_t state = stream_readable_state(js, stream_obj); 735 bool emitted_data = false; 736 737 if (!is_object_type(state)) return js_mkundef(); 738 739 while (js_truthy(js, js_get(js, state, "flowing")) && stream_readable_buffer_len(js, stream_obj) > 0) { 740 ant_value_t chunk = stream_buffer_shift(js, stream_obj); 741 chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false); 742 if (is_err(chunk)) return chunk; 743 emitted_data = true; 744 eventemitter_emit_args(js, stream_obj, "data", &chunk, 1); 745 } 746 747 if ( 748 js_truthy(js, js_get(js, state, "ended")) && 749 stream_readable_buffer_len(js, stream_obj) == 0 && 750 !js_truthy(js, js_get(js, state, "endEmitted")) 751 ) { 752 ant_value_t tail = stream_readable_decode_chunk(js, stream_obj, js_mkundef(), true); 753 if (is_err(tail)) return tail; 754 if (!is_undefined(tail) && !stream_value_is_empty_string(js, tail)) { 755 emitted_data = true; 756 eventemitter_emit_args(js, stream_obj, "data", &tail, 1); 757 } 758 js_set(js, state, "endEmitted", js_true); 759 js_set(js, stream_obj, "readableEnded", js_true); 760 stream_emit_named(js, stream_obj, "end"); 761 stream_emit_named(js, stream_obj, "close"); 762 } else if (emitted_data) stream_readable_schedule_continue_flowing(js, stream_obj); 763 764 return js_mkundef(); 765} 766 767ant_value_t stream_readable_maybe_read(ant_t *js, ant_value_t stream_obj) { 768 ant_value_t state = stream_readable_state(js, stream_obj); 769 ant_value_t read_fn = 0; 770 ant_value_t args[1]; 771 772 if (!is_object_type(state)) return js_mkundef(); 773 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef(); 774 if (js_truthy(js, js_get(js, state, "reading"))) return js_mkundef(); 775 if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef(); 776 if (stream_readable_buffer_len(js, stream_obj) > 0) return js_mkundef(); 777 778 read_fn = js_getprop_fallback(js, stream_obj, "_read"); 779 js_set(js, state, "reading", js_true); 780 args[0] = js_get(js, state, "highWaterMark"); 781 782 if (is_callable(read_fn)) stream_call(js, read_fn, stream_obj, args, 1, false); 783 js_set(js, state, "reading", js_false); 784 785 return js_mkundef(); 786} 787 788ant_value_t stream_readable_push_value( 789 ant_t *js, 790 ant_value_t stream_obj, 791 ant_value_t chunk, 792 ant_value_t encoding 793) { 794 ant_value_t state = stream_readable_state(js, stream_obj); 795 ant_value_t normalized = 0; 796 797 if (!is_object_type(state)) return js_false; 798 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_false; 799 800 if (is_null(chunk)) { 801 js_set(js, state, "ended", js_true); 802 stream_readable_flush(js, stream_obj); 803 return js_false; 804 } 805 806 normalized = stream_normalize_chunk( 807 js, chunk, 808 js_truthy(js, js_get(js, state, "objectMode")), 809 is_undefined(encoding) ? js_mkstr(js, "utf8", 4) : encoding 810 ); 811 if (is_err(normalized)) return normalized; 812 813 stream_buffer_push(js, stream_obj, normalized); 814 if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj); 815 816 return js_bool(js_truthy(js, js_get(js, state, "flowing"))); 817} 818 819static ant_value_t js_readable_push(ant_t *js, ant_value_t *args, int nargs) { 820 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); 821 ant_value_t chunk = nargs > 0 ? args[0] : js_mkundef(); 822 ant_value_t encoding = nargs > 1 ? args[1] : js_mkundef(); 823 if (is_err(stream_obj)) return stream_obj; 824 return stream_readable_push_value(js, stream_obj, chunk, encoding); 825} 826 827static ant_value_t js_readable_read(ant_t *js, ant_value_t *args, int nargs) { 828 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); 829 ant_value_t state = 0; 830 ant_value_t chunk = 0; 831 832 if (is_err(stream_obj)) return stream_obj; 833 state = stream_readable_state(js, stream_obj); 834 if (!is_object_type(state)) return js_mknull(); 835 836 if (stream_readable_buffer_len(js, stream_obj) == 0) stream_readable_maybe_read(js, stream_obj); 837 if (stream_readable_buffer_len(js, stream_obj) == 0) return js_mknull(); 838 839 chunk = stream_buffer_shift(js, stream_obj); 840 chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false); 841 if (is_err(chunk)) return chunk; 842 if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj); 843 844 return chunk; 845} 846 847static ant_value_t js_readable_set_encoding(ant_t *js, ant_value_t *args, int nargs) { 848 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); 849 ant_value_t state = 0; ant_value_t decoder = 0; 850 851 ant_value_t encoding = nargs > 0 && !is_undefined(args[0]) ? args[0] : js_mkstr(js, "utf8", 4); 852 ant_value_t encoding_str = 0; 853 854 if (is_err(stream_obj)) return stream_obj; 855 state = stream_readable_state(js, stream_obj); 856 if (!is_object_type(state)) return stream_obj; 857 858 decoder = string_decoder_create(js, encoding); 859 if (is_err(decoder)) return decoder; 860 encoding_str = js_tostring_val(js, encoding); 861 if (is_err(encoding_str)) return encoding_str; 862 863 js_set(js, state, "decoder", decoder); 864 js_set(js, stream_obj, "encoding", encoding_str); 865 js_set(js, stream_obj, "readableEncoding", encoding_str); 866 867 return stream_obj; 868} 869 870static ant_value_t js_readable_on(ant_t *js, ant_value_t *args, int nargs) { 871 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); 872 ant_value_t key = 0; 873 ant_value_t state = 0; 874 875 if (is_err(stream_obj)) return stream_obj; 876 if (nargs < 2) return js_mkerr(js, "on requires 2 arguments (event, listener)"); 877 key = stream_event_key(js, args[0]); 878 879 if (is_err(key)) return key; 880 if (!eventemitter_add_listener_val(js, stream_obj, key, args[1], false)) 881 return js_mkerr(js, "listener must be a function"); 882 883 if (stream_key_is_cstr(js, key, "data")) { 884 state = stream_readable_state(js, stream_obj); 885 if (is_object_type(state)) js_set(js, state, "flowing", js_true); 886 stream_schedule_microtask(js, stream_readable_start_flowing, stream_obj); 887 } 888 889 return stream_obj; 890} 891 892static ant_value_t js_readable_resume(ant_t *js, ant_value_t *args, int nargs) { 893 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); 894 ant_value_t state = 0; 895 if (is_err(stream_obj)) return stream_obj; 896 897 state = stream_readable_state(js, stream_obj); 898 if (is_object_type(state)) js_set(js, state, "flowing", js_true); 899 900 js_stream_resume(js, NULL, 0); 901 stream_readable_maybe_read(js, stream_obj); 902 stream_readable_flush(js, stream_obj); 903 904 return stream_obj; 905} 906 907static ant_value_t js_readable_pause(ant_t *js, ant_value_t *args, int nargs) { 908 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); 909 ant_value_t state = 0; 910 if (is_err(stream_obj)) return stream_obj; 911 912 state = stream_readable_state(js, stream_obj); 913 if (is_object_type(state)) js_set(js, state, "flowing", js_false); 914 return js_stream_pause(js, args, nargs); 915} 916 917static ant_value_t js_writable__write(ant_t *js, ant_value_t *args, int nargs) { 918 ant_value_t callback = nargs > 2 ? args[2] : js_mkundef(); 919 stream_call_callback(js, callback, NULL, 0); 920 return js_mkundef(); 921} 922 923static ant_value_t js_writable__final(ant_t *js, ant_value_t *args, int nargs) { 924 ant_value_t callback = nargs > 0 ? args[0] : js_mkundef(); 925 stream_call_callback(js, callback, NULL, 0); 926 return js_mkundef(); 927} 928 929static ant_value_t stream_writable_write_done(ant_t *js, ant_value_t *args, int nargs) { 930 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 931 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 932 ant_value_t callback = js_get(js, state_obj, "callback"); 933 934 stream_private_state_t *priv = stream_private_state(stream_obj); 935 ant_value_t err = nargs > 0 ? args[0] : js_mkundef(); 936 937 if (priv) priv->writing = false; 938 939 if (!is_undefined(err) && !is_null(err)) { 940 ant_value_t destroy_args[1] = { err }; 941 js_set(js, state_obj, "done", js_true); 942 if (priv) priv->pending_final = false; { 943 ant_value_t saved_this = js->this_val; 944 js->this_val = stream_obj; 945 js_stream_destroy(js, destroy_args, 1); 946 js->this_val = saved_this; 947 } 948 949 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); 950 return js_mkundef(); 951 } 952 953 if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0); 954 stream_emit_named(js, stream_obj, "drain"); 955 956 if (priv && priv->pending_final && !priv->final_started) { 957 ant_value_t end_callback = js_get_slot(stream_obj, SLOT_AUX); 958 priv->pending_final = false; 959 stream_set_end_callback(js, stream_obj, js_mkundef()); 960 return stream_writable_begin_end(js, stream_obj, end_callback); 961 } 962 963 return js_mkundef(); 964} 965 966static ant_value_t stream_writable_write_impl( 967 ant_t *js, 968 ant_value_t stream_obj, 969 ant_value_t chunk, 970 ant_value_t encoding, 971 ant_value_t callback, 972 bool allow_after_end 973) { 974 ant_value_t state = 0; 975 ant_value_t normalized = 0; 976 ant_value_t write_fn = 0; 977 ant_value_t done_state = 0; 978 ant_value_t done = 0; 979 ant_value_t write_args[3]; 980 981 state = stream_writable_state(js, stream_obj); 982 if (!is_object_type(state)) return js_false; 983 984 if ( 985 (!allow_after_end && js_truthy(js, js_get(js, stream_obj, "writableEnded"))) || 986 js_truthy(js, js_get(js, stream_obj, "destroyed")) 987 ) { 988 ant_value_t err = js_mkerr(js, "write after end"); 989 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); 990 else stream_emit_error(js, stream_obj, err); 991 return js_false; 992 } 993 994 normalized = stream_normalize_chunk( 995 js, chunk, 996 js_truthy(js, js_get(js, state, "objectMode")), 997 encoding 998 ); 999 1000 if (is_err(normalized)) return normalized; 1001 done_state = js_mkobj(js); 1002 1003 js_set(js, done_state, "stream", stream_obj); 1004 js_set(js, done_state, "callback", callback); 1005 js_set(js, done_state, "done", js_false); 1006 1007 done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_write_done, done_state), js_mkundef()); 1008 write_fn = js_getprop_fallback(js, stream_obj, "_write"); 1009 stream_private_state_t *priv = stream_private_state(stream_obj); 1010 if (priv) priv->writing = true; 1011 1012 write_args[0] = normalized; 1013 write_args[1] = encoding; 1014 write_args[2] = done; 1015 1016 if (is_callable(write_fn)) { 1017 ant_value_t result = stream_call(js, write_fn, stream_obj, write_args, 3, false); 1018 if (is_err(result)) { 1019 ant_value_t err_args[1] = { result }; 1020 stream_call_callback(js, done, err_args, 1); 1021 return js_false; 1022 }} 1023 1024 return js_bool(!js_truthy(js, js_get(js, stream_obj, "destroyed"))); 1025} 1026 1027static ant_value_t js_writable_write(ant_t *js, ant_value_t *args, int nargs) { 1028 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable"); 1029 ant_value_t callback = js_mkundef(); 1030 ant_value_t encoding = js_mkstr(js, "utf8", 4); 1031 1032 if (is_err(stream_obj)) return stream_obj; 1033 1034 if (nargs > 1 && is_callable(args[1])) callback = args[1]; 1035 else if (nargs > 1 && !is_undefined(args[1])) encoding = args[1]; 1036 if (nargs > 2 && is_callable(args[2])) callback = args[2]; 1037 1038 return stream_writable_write_impl( 1039 js, stream_obj, 1040 nargs > 0 ? args[0] : js_mkundef(), 1041 encoding, callback, false 1042 ); 1043} 1044 1045static ant_value_t stream_writable_end_done(ant_t *js, ant_value_t *args, int nargs) { 1046 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1047 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 1048 ant_value_t callback = js_get(js, state_obj, "callback"); 1049 ant_value_t err = nargs > 0 ? args[0] : js_mkundef(); 1050 1051 if (!is_undefined(err) && !is_null(err)) { 1052 ant_value_t destroy_args[1] = { err }; 1053 ant_value_t saved_this = js->this_val; 1054 js->this_val = stream_obj; 1055 js_stream_destroy(js, destroy_args, 1); 1056 js->this_val = saved_this; 1057 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); 1058 return js_mkundef(); 1059 } 1060 1061 js_set(js, stream_obj, "writableFinished", js_true); 1062 js_set(js, stream_writable_state(js, stream_obj), "finished", js_true); 1063 stream_emit_named(js, stream_obj, "finish"); 1064 1065 if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0); 1066 if (!js_truthy(js, js_get(js, stream_obj, "readable"))) stream_emit_named(js, stream_obj, "close"); 1067 1068 return js_mkundef(); 1069} 1070 1071ant_value_t stream_writable_begin_end(ant_t *js, ant_value_t stream_obj, ant_value_t callback) { 1072 ant_value_t final_fn = 0; 1073 ant_value_t final_args[1]; 1074 ant_value_t done_state = 0; 1075 ant_value_t done = 0; 1076 stream_private_state_t *priv = stream_private_state(stream_obj); 1077 1078 done_state = js_mkobj(js); 1079 js_set(js, done_state, "stream", stream_obj); 1080 js_set(js, done_state, "callback", callback); 1081 1082 done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_end_done, done_state), js_mkundef()); 1083 if (priv) { 1084 priv->final_started = true; 1085 priv->pending_final = false; 1086 } 1087 1088 stream_set_end_callback(js, stream_obj, js_mkundef()); 1089 final_fn = js_getprop_fallback(js, stream_obj, "_final"); 1090 final_args[0] = done; 1091 1092 if (is_callable(final_fn)) stream_call(js, final_fn, stream_obj, final_args, 1, false); 1093 else stream_call_callback(js, done, NULL, 0); 1094 1095 return stream_obj; 1096} 1097 1098static ant_value_t stream_writable_end_after_write(ant_t *js, ant_value_t *args, int nargs) { 1099 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1100 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 1101 ant_value_t callback = js_get(js, state_obj, "callback"); 1102 stream_private_state_t *priv = stream_private_state(stream_obj); 1103 ant_value_t err = nargs > 0 ? args[0] : js_mkundef(); 1104 1105 if (!is_undefined(err) && !is_null(err)) { 1106 ant_value_t destroy_args[1] = { err }; 1107 ant_value_t saved_this = js->this_val; 1108 js->this_val = stream_obj; 1109 js_stream_destroy(js, destroy_args, 1); 1110 js->this_val = saved_this; 1111 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); 1112 return js_mkundef(); 1113 } 1114 1115 if (priv) priv->pending_final = false; 1116 stream_set_end_callback(js, stream_obj, js_mkundef()); 1117 1118 return stream_writable_begin_end(js, stream_obj, callback); 1119} 1120 1121static ant_value_t js_writable_end(ant_t *js, ant_value_t *args, int nargs) { 1122 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable"); 1123 ant_value_t callback = js_mkundef(); 1124 ant_value_t chunk = js_mkundef(); 1125 ant_value_t encoding = js_mkundef(); 1126 ant_value_t after_write_state = 0; 1127 ant_value_t after_write = 0; 1128 stream_private_state_t *priv = NULL; 1129 1130 if (is_err(stream_obj)) return stream_obj; 1131 if (nargs > 0 && is_callable(args[0])) callback = args[0]; 1132 else { 1133 if (nargs > 0) chunk = args[0]; 1134 if (nargs > 1 && is_callable(args[1])) callback = args[1]; 1135 else if (nargs > 1) encoding = args[1]; 1136 if (nargs > 2 && is_callable(args[2])) callback = args[2]; 1137 } 1138 1139 if (js_truthy(js, js_get(js, stream_obj, "writableEnded"))) { 1140 if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0); 1141 return stream_obj; 1142 } 1143 1144 js_set(js, stream_obj, "writableEnded", js_true); 1145 js_set(js, stream_writable_state(js, stream_obj), "ended", js_true); 1146 priv = stream_private_state(stream_obj); 1147 1148 if (!is_undefined(chunk) && !is_null(chunk)) { 1149 after_write_state = js_mkobj(js); 1150 js_set(js, after_write_state, "stream", stream_obj); 1151 js_set(js, after_write_state, "callback", callback); 1152 1153 after_write = stream_make_once( 1154 js, js_heavy_mkfun(js, stream_writable_end_after_write, after_write_state), 1155 js_mkundef() 1156 ); 1157 1158 stream_writable_write_impl(js, stream_obj, chunk, encoding, after_write, true); 1159 return stream_obj; 1160 } 1161 1162 if (priv && priv->writing && !priv->final_started) { 1163 priv->pending_final = true; 1164 stream_set_end_callback(js, stream_obj, callback); 1165 return stream_obj; 1166 } 1167 1168 return stream_writable_begin_end(js, stream_obj, callback); 1169} 1170 1171static ant_value_t js_transform__transform(ant_t *js, ant_value_t *args, int nargs) { 1172 ant_value_t callback = nargs > 2 ? args[2] : js_mkundef(); 1173 ant_value_t cb_args[2]; 1174 cb_args[0] = js_mknull(); 1175 cb_args[1] = nargs > 0 ? args[0] : js_mkundef(); 1176 stream_call_callback(js, callback, cb_args, 2); 1177 return js_mkundef(); 1178} 1179 1180static ant_value_t stream_transform_write_callback(ant_t *js, ant_value_t *args, int nargs) { 1181 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1182 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 1183 ant_value_t outer_callback = js_get(js, state_obj, "callback"); 1184 1185 if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) { 1186 if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, &args[0], 1, false); 1187 return js_mkundef(); 1188 } 1189 1190 if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1])) 1191 stream_readable_push_value(js, stream_obj, args[1], js_mkundef()); 1192 1193 if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, NULL, 0, false); 1194 return js_mkundef(); 1195} 1196 1197static ant_value_t js_transform__write(ant_t *js, ant_value_t *args, int nargs) { 1198 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform"); 1199 ant_value_t transform_fn = 0; 1200 ant_value_t cb_state = 0; 1201 ant_value_t cb = 0; 1202 ant_value_t call_args[3]; 1203 1204 if (is_err(stream_obj)) return stream_obj; 1205 transform_fn = js_getprop_fallback(js, stream_obj, "_transform"); 1206 1207 cb_state = js_mkobj(js); 1208 js_set(js, cb_state, "stream", stream_obj); 1209 js_set(js, cb_state, "callback", nargs > 2 ? args[2] : js_mkundef()); 1210 cb = js_heavy_mkfun(js, stream_transform_write_callback, cb_state); 1211 1212 call_args[0] = nargs > 0 ? args[0] : js_mkundef(); 1213 call_args[1] = nargs > 1 ? args[1] : js_mkstr(js, "utf8", 4); 1214 call_args[2] = cb; 1215 1216 return stream_call(js, transform_fn, stream_obj, call_args, 3, false); 1217} 1218 1219static ant_value_t stream_transform_final_callback(ant_t *js, ant_value_t *args, int nargs) { 1220 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1221 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 1222 ant_value_t callback = js_get(js, state_obj, "callback"); 1223 1224 if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) { 1225 if (is_callable(callback)) stream_call(js, callback, stream_obj, &args[0], 1, false); 1226 return js_mkundef(); 1227 } 1228 1229 if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1])) 1230 stream_readable_push_value(js, stream_obj, args[1], js_mkundef()); 1231 stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef()); 1232 if (is_callable(callback)) stream_call(js, callback, stream_obj, NULL, 0, false); 1233 1234 return js_mkundef(); 1235} 1236 1237static ant_value_t js_transform__final(ant_t *js, ant_value_t *args, int nargs) { 1238 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform"); 1239 ant_value_t flush_fn = 0; 1240 ant_value_t cb_state = 0; 1241 ant_value_t cb = 0; 1242 ant_value_t call_args[1]; 1243 1244 if (is_err(stream_obj)) return stream_obj; 1245 flush_fn = js_getprop_fallback(js, stream_obj, "_flush"); 1246 1247 if (!is_callable(flush_fn)) { 1248 stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef()); 1249 stream_call_callback(js, nargs > 0 ? args[0] : js_mkundef(), NULL, 0); 1250 return js_mkundef(); 1251 } 1252 1253 cb_state = js_mkobj(js); 1254 js_set(js, cb_state, "stream", stream_obj); 1255 js_set(js, cb_state, "callback", nargs > 0 ? args[0] : js_mkundef()); 1256 cb = js_heavy_mkfun(js, stream_transform_final_callback, cb_state); 1257 call_args[0] = cb; 1258 return stream_call(js, flush_fn, stream_obj, call_args, 1, false); 1259} 1260 1261static ant_value_t js_passthrough__transform(ant_t *js, ant_value_t *args, int nargs) { 1262 return js_transform__transform(js, args, nargs); 1263} 1264 1265static ant_value_t stream_finished_cleanup(ant_t *js, ant_value_t state_obj) { 1266 ant_value_t stream_obj = js_get(js, state_obj, "stream"); 1267 if (stream_is_instance(stream_obj) || is_object_type(stream_obj)) { 1268 stream_remove_listener(js, stream_obj, "end", js_get(js, state_obj, "onFinish")); 1269 stream_remove_listener(js, stream_obj, "finish", js_get(js, state_obj, "onFinish")); 1270 stream_remove_listener(js, stream_obj, "close", js_get(js, state_obj, "onFinish")); 1271 stream_remove_listener(js, stream_obj, "error", js_get(js, state_obj, "onError")); 1272 } 1273 return js_mkundef(); 1274} 1275 1276static ant_value_t stream_finished_fire(ant_t *js, ant_value_t state_obj, ant_value_t error) { 1277 ant_value_t called = js_get(js, state_obj, "called"); 1278 ant_value_t callback = js_get(js, state_obj, "callback"); 1279 ant_value_t cb_args[1]; 1280 1281 if (js_truthy(js, called)) return js_mkundef(); 1282 js_set(js, state_obj, "called", js_true); 1283 stream_finished_cleanup(js, state_obj); 1284 1285 if (is_undefined(error)) stream_call_callback(js, callback, NULL, 0); 1286 else { 1287 cb_args[0] = error; 1288 stream_call_callback(js, callback, cb_args, 1); 1289 } 1290 return js_mkundef(); 1291} 1292 1293static ant_value_t stream_finished_on_finish(ant_t *js, ant_value_t *args, int nargs) { 1294 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1295 return stream_finished_fire(js, state_obj, js_mkundef()); 1296} 1297 1298static ant_value_t stream_finished_on_error(ant_t *js, ant_value_t *args, int nargs) { 1299 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1300 ant_value_t error = nargs > 0 ? args[0] : js_mkundef(); 1301 return stream_finished_fire(js, state_obj, error); 1302} 1303 1304static ant_value_t stream_finished_register(ant_t *js, ant_value_t stream_obj, ant_value_t callback) { 1305 ant_value_t state_obj = js_mkobj(js); 1306 ant_value_t on_finish = 0; 1307 ant_value_t on_error = 0; 1308 1309 if (!is_callable(callback)) callback = js_mkfun(stream_noop); 1310 1311 js_set(js, state_obj, "stream", stream_obj); 1312 js_set(js, state_obj, "callback", callback); 1313 js_set(js, state_obj, "called", js_false); 1314 on_finish = js_heavy_mkfun(js, stream_finished_on_finish, state_obj); 1315 on_error = js_heavy_mkfun(js, stream_finished_on_error, state_obj); 1316 js_set(js, state_obj, "onFinish", on_finish); 1317 js_set(js, state_obj, "onError", on_error); 1318 1319 eventemitter_add_listener(js, stream_obj, "end", on_finish, false); 1320 eventemitter_add_listener(js, stream_obj, "finish", on_finish, false); 1321 eventemitter_add_listener(js, stream_obj, "close", on_finish, false); 1322 eventemitter_add_listener(js, stream_obj, "error", on_error, false); 1323 return stream_obj; 1324} 1325 1326static ant_value_t js_stream_finished(ant_t *js, ant_value_t *args, int nargs) { 1327 ant_value_t callback = nargs > 1 ? args[1] : js_mkundef(); 1328 if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "finished requires a stream"); 1329 return stream_finished_register(js, args[0], callback); 1330} 1331 1332static ant_value_t stream_pipeline_done(ant_t *js, ant_value_t *args, int nargs) { 1333 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1334 ant_value_t callback = js_get(js, state_obj, "callback"); 1335 ant_value_t called = js_get(js, state_obj, "called"); 1336 ant_value_t cb_args[1]; 1337 1338 if (js_truthy(js, called)) return js_mkundef(); 1339 js_set(js, state_obj, "called", js_true); 1340 1341 if (nargs > 0 && !is_undefined(args[0])) { 1342 cb_args[0] = args[0]; 1343 stream_call_callback(js, callback, cb_args, 1); 1344 } else stream_call_callback(js, callback, NULL, 0); 1345 return js_mkundef(); 1346} 1347 1348static ant_value_t stream_pipeline_error(ant_t *js, ant_value_t *args, int nargs) { 1349 ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1350 if (nargs > 0 && !is_undefined(args[0])) stream_call_callback(js, done, &args[0], 1); 1351 return js_mkundef(); 1352} 1353 1354static ant_value_t stream_pipeline_schedule_done(ant_t *js, ant_value_t *args, int nargs) { 1355 ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1356 stream_call_callback(js, done, NULL, 0); 1357 return js_mkundef(); 1358} 1359 1360static ant_value_t js_stream_pipeline(ant_t *js, ant_value_t *args, int nargs) { 1361 int stream_count = nargs; 1362 ant_value_t callback = js_mkundef(); 1363 ant_value_t done_state = 0; 1364 ant_value_t done = 0; 1365 1366 if (nargs > 0 && is_callable(args[nargs - 1])) { 1367 callback = args[nargs - 1]; 1368 stream_count--; 1369 } 1370 1371 if (!is_callable(callback)) callback = js_mkfun(stream_noop); 1372 if (stream_count <= 0) return js_mkundef(); 1373 1374 done_state = js_mkobj(js); 1375 js_set(js, done_state, "callback", callback); 1376 js_set(js, done_state, "called", js_false); 1377 done = js_heavy_mkfun(js, stream_pipeline_done, done_state); 1378 1379 if (stream_count < 2) { 1380 stream_schedule_microtask(js, stream_pipeline_schedule_done, done); 1381 return args[0]; 1382 } 1383 1384 for (int i = 0; i < stream_count - 1; i++) { 1385 ant_value_t error_cb = js_heavy_mkfun(js, stream_pipeline_error, done); 1386 ant_value_t finished_args[2]; 1387 1388 finished_args[0] = args[i]; 1389 finished_args[1] = error_cb; 1390 js_stream_finished(js, finished_args, 2); 1391 1392 ant_value_t pipe_args[2]; 1393 pipe_args[0] = args[i + 1]; 1394 pipe_args[1] = js_mkundef(); 1395 stream_call_prop(js, args[i], "pipe", pipe_args, 2); 1396 } 1397 1398 { 1399 ant_value_t finished_args[2]; 1400 finished_args[0] = args[stream_count - 1]; 1401 finished_args[1] = done; 1402 js_stream_finished(js, finished_args, 2); 1403 } 1404 1405 return args[stream_count - 1]; 1406} 1407 1408static ant_value_t stream_promise_callback(ant_t *js, ant_value_t *args, int nargs) { 1409 ant_value_t promise = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1410 if (nargs > 0 && !is_undefined(args[0])) js_reject_promise(js, promise, args[0]); 1411 else js_resolve_promise(js, promise, js_mkundef()); 1412 return js_mkundef(); 1413} 1414 1415static ant_value_t js_stream_promises_finished(ant_t *js, ant_value_t *args, int nargs) { 1416 ant_value_t promise = js_mkpromise(js); 1417 ant_value_t finished_args[2]; 1418 if (nargs < 1 || !is_object_type(args[0])) { 1419 js_reject_promise(js, promise, js_mkerr(js, "finished requires a stream")); 1420 return promise; 1421 } 1422 finished_args[0] = args[0]; 1423 finished_args[1] = js_heavy_mkfun(js, stream_promise_callback, promise); 1424 js_stream_finished(js, finished_args, 2); 1425 return promise; 1426} 1427 1428static ant_value_t js_stream_promises_pipeline(ant_t *js, ant_value_t *args, int nargs) { 1429 ant_value_t promise = js_mkpromise(js); 1430 ant_value_t *call_args = NULL; 1431 1432 if (nargs <= 0) { 1433 js_resolve_promise(js, promise, js_mkundef()); 1434 return promise; 1435 } 1436 1437 call_args = malloc((size_t)(nargs + 1) * sizeof(*call_args)); 1438 if (!call_args) { 1439 js_reject_promise(js, promise, js_mkerr(js, "out of memory")); 1440 return promise; 1441 } 1442 1443 for (int i = 0; i < nargs; i++) call_args[i] = args[i]; 1444 call_args[nargs] = js_heavy_mkfun(js, stream_promise_callback, promise); 1445 js_stream_pipeline(js, call_args, nargs + 1); 1446 free(call_args); 1447 return promise; 1448} 1449 1450static void stream_release_reader(ant_t *js, ant_value_t state_obj) { 1451 ant_value_t reader = js_get(js, state_obj, "reader"); 1452 if (!is_object_type(reader)) return; 1453 stream_call_prop(js, reader, "releaseLock", NULL, 0); 1454} 1455 1456static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs); 1457 1458static void stream_readable_from_schedule(ant_t *js, ant_value_t state_obj) { 1459 stream_schedule_microtask(js, stream_readable_from_step, state_obj); 1460} 1461 1462static ant_value_t stream_readable_from_fail(ant_t *js, ant_value_t state_obj, ant_value_t error) { 1463 ant_value_t readable = js_get(js, state_obj, "readable"); 1464 stream_release_reader(js, state_obj); 1465 if (stream_is_instance(readable)) { 1466 ant_value_t destroy_args[1] = { error }; 1467 ant_value_t saved_this = js->this_val; 1468 js->this_val = readable; 1469 js_stream_destroy(js, destroy_args, 1); 1470 js->this_val = saved_this; 1471 } 1472 return js_mkundef(); 1473} 1474 1475static ant_value_t stream_readable_from_handle_result(ant_t *js, ant_value_t *args, int nargs) { 1476 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1477 ant_value_t readable = js_get(js, state_obj, "readable"); 1478 ant_value_t result = nargs > 0 ? args[0] : js_mkundef(); 1479 ant_value_t done = 0; 1480 ant_value_t value = 0; 1481 1482 if (js_truthy(js, js_get(js, readable, "destroyed"))) { 1483 stream_release_reader(js, state_obj); 1484 return js_mkundef(); 1485 } 1486 1487 if (!is_object_type(result)) return stream_readable_from_fail(js, state_obj, js_mkerr(js, "iterator step must be an object")); 1488 done = js_get(js, result, "done"); 1489 value = js_get(js, result, "value"); 1490 if (js_truthy(js, done)) { 1491 stream_release_reader(js, state_obj); 1492 stream_readable_push_value(js, readable, js_mknull(), js_mkundef()); 1493 return js_mkundef(); 1494 } 1495 1496 stream_readable_push_value(js, readable, value, js_mkundef()); 1497 if (!js_truthy(js, js_get(js, readable, "destroyed"))) stream_readable_from_schedule(js, state_obj); 1498 return js_mkundef(); 1499} 1500 1501static ant_value_t stream_readable_from_reject(ant_t *js, ant_value_t *args, int nargs) { 1502 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1503 ant_value_t error = nargs > 0 ? args[0] : js_mkerr(js, "stream iteration failed"); 1504 return stream_readable_from_fail(js, state_obj, error); 1505} 1506 1507static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs) { 1508 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1509 ant_value_t readable = js_get(js, state_obj, "readable"); 1510 ant_value_t mode = js_get(js, state_obj, "mode"); 1511 ant_value_t iterator = js_get(js, state_obj, "iterator"); 1512 1513 ant_value_t next_result = js_mkundef(); 1514 ant_value_t on_resolve = js_heavy_mkfun(js, stream_readable_from_handle_result, state_obj); 1515 ant_value_t on_reject = js_heavy_mkfun(js, stream_readable_from_reject, state_obj); 1516 ant_value_t then_result = 0; 1517 1518 if (js_truthy(js, js_get(js, readable, "destroyed"))) { 1519 stream_release_reader(js, state_obj); 1520 return js_mkundef(); 1521 } 1522 1523 if (stream_key_is_cstr(js, mode, "reader")) next_result = stream_call_prop(js, iterator, "read", NULL, 0); 1524 else next_result = stream_call_prop(js, iterator, "next", NULL, 0); 1525 1526 if (is_err(next_result)) return stream_readable_from_fail(js, state_obj, next_result); 1527 if (vtype(next_result) == T_PROMISE) { 1528 then_result = js_promise_then(js, next_result, on_resolve, on_reject); 1529 promise_mark_handled(then_result); 1530 return js_mkundef(); 1531 } 1532 1533 ant_value_t one_arg[1] = { next_result }; 1534 return stream_readable_from_handle_result(js, one_arg, 1); 1535} 1536 1537static ant_value_t stream_readable_from_start(ant_t *js, ant_value_t *args, int nargs) { 1538 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 1539 ant_value_t readable = js_get(js, state_obj, "readable"); 1540 ant_value_t source = js_get(js, state_obj, "source"); 1541 1542 ant_value_t async_iter_fn = 0; 1543 ant_value_t reader_fn = 0; 1544 js_iter_t it; 1545 1546 if (js_truthy(js, js_get(js, readable, "destroyed"))) return js_mkundef(); 1547 1548 async_iter_fn = is_object_type(source) ? js_get_sym(js, source, get_asyncIterator_sym()) : js_mkundef(); 1549 if (is_callable(async_iter_fn)) { 1550 ant_value_t iterator = stream_call(js, async_iter_fn, source, NULL, 0, false); 1551 if (is_err(iterator)) return stream_readable_from_fail(js, state_obj, iterator); 1552 js_set(js, state_obj, "iterator", iterator); 1553 js_set(js, state_obj, "mode", js_mkstr(js, "async", 5)); 1554 stream_readable_from_schedule(js, state_obj); 1555 return js_mkundef(); 1556 } 1557 1558 if (js_iter_open(js, source, &it)) { 1559 ant_value_t value = 0; 1560 while (js_iter_next(js, &it, &value)) { 1561 if (js_truthy(js, js_get(js, readable, "destroyed"))) break; 1562 stream_readable_push_value(js, readable, value, js_mkundef()); 1563 } 1564 js_iter_close(js, &it); 1565 if (!js_truthy(js, js_get(js, readable, "destroyed"))) 1566 stream_readable_push_value(js, readable, js_mknull(), js_mkundef()); 1567 return js_mkundef(); 1568 } 1569 1570 reader_fn = is_object_type(source) ? js_get(js, source, "getReader") : js_mkundef(); 1571 if (is_callable(reader_fn)) { 1572 ant_value_t reader = stream_call(js, reader_fn, source, NULL, 0, false); 1573 if (is_err(reader)) return stream_readable_from_fail(js, state_obj, reader); 1574 js_set(js, state_obj, "reader", reader); 1575 js_set(js, state_obj, "iterator", reader); 1576 js_set(js, state_obj, "mode", js_mkstr(js, "reader", 6)); 1577 stream_readable_from_schedule(js, state_obj); 1578 return js_mkundef(); 1579 } 1580 1581 if (!is_undefined(source)) stream_readable_push_value(js, readable, source, js_mkundef()); 1582 stream_readable_push_value(js, readable, js_mknull(), js_mkundef()); 1583 1584 return js_mkundef(); 1585} 1586 1587static ant_value_t js_readable_from(ant_t *js, ant_value_t *args, int nargs) { 1588 ant_value_t ctor_args[1]; 1589 ant_value_t readable = 0; 1590 ant_value_t state_obj = 0; 1591 1592 ctor_args[0] = nargs > 1 ? args[1] : js_mkundef(); 1593 readable = stream_construct(js, g_readable_proto, ctor_args[0], stream_init_readable); 1594 if (is_err(readable)) return readable; 1595 1596 state_obj = js_mkobj(js); 1597 js_set(js, state_obj, "readable", readable); 1598 js_set(js, state_obj, "source", nargs > 0 ? args[0] : js_mkundef()); 1599 js_set(js, state_obj, "iterator", js_mkundef()); 1600 js_set(js, state_obj, "reader", js_mkundef()); 1601 js_set(js, state_obj, "mode", js_mkundef()); 1602 stream_schedule_microtask(js, stream_readable_from_start, state_obj); 1603 1604 return readable; 1605} 1606 1607static ant_value_t js_readable_from_web(ant_t *js, ant_value_t *args, int nargs) { 1608 return js_readable_from(js, args, nargs); 1609} 1610 1611static ant_value_t js_stream_ctor(ant_t *js, ant_value_t *args, int nargs) { 1612 return stream_construct(js, g_stream_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_base); 1613} 1614 1615static ant_value_t js_readable_ctor(ant_t *js, ant_value_t *args, int nargs) { 1616 return stream_construct(js, g_readable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_readable); 1617} 1618 1619static ant_value_t js_writable_ctor(ant_t *js, ant_value_t *args, int nargs) { 1620 return stream_construct(js, g_writable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_writable); 1621} 1622 1623static ant_value_t js_duplex_ctor(ant_t *js, ant_value_t *args, int nargs) { 1624 ant_value_t proto = js_instance_proto_from_new_target(js, g_duplex_proto); 1625 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_duplex_proto); 1626 ant_value_t options = nargs > 0 ? args[0] : js_mkundef(); 1627 ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js); 1628 1629 stream_init_readable(js, obj, options); 1630 stream_init_writable(js, obj, options); 1631 1632 js_set(js, obj, "readable", js_true); 1633 js_set(js, obj, "writable", js_true); 1634 js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false)); 1635 1636 return obj; 1637} 1638 1639static ant_value_t js_transform_ctor(ant_t *js, ant_value_t *args, int nargs) { 1640 ant_value_t proto = js_instance_proto_from_new_target(js, g_transform_proto); 1641 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_transform_proto); 1642 ant_value_t options = nargs > 0 ? args[0] : js_mkundef(); 1643 ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js); 1644 1645 ant_value_t transform_fn = 0; 1646 ant_value_t flush_fn = 0; 1647 1648 stream_init_readable(js, obj, options); 1649 stream_init_writable(js, obj, options); 1650 1651 js_set(js, obj, "readable", js_true); 1652 js_set(js, obj, "writable", js_true); 1653 js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false)); 1654 1655 transform_fn = stream_get_option(js, options_obj, "transform"); 1656 flush_fn = stream_get_option(js, options_obj, "flush"); 1657 1658 if (is_callable(transform_fn)) js_set(js, obj, "_transform", transform_fn); 1659 if (is_callable(flush_fn)) js_set(js, obj, "_flush", flush_fn); 1660 1661 return obj; 1662} 1663 1664static ant_value_t js_passthrough_ctor(ant_t *js, ant_value_t *args, int nargs) { 1665 ant_value_t proto = js_instance_proto_from_new_target(js, g_passthrough_proto); 1666 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_passthrough_proto); 1667 ant_value_t options = nargs > 0 ? args[0] : js_mkundef(); 1668 ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js); 1669 1670 stream_init_readable(js, obj, options); 1671 stream_init_writable(js, obj, options); 1672 1673 js_set(js, obj, "readable", js_true); 1674 js_set(js, obj, "writable", js_true); 1675 js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false)); 1676 1677 return obj; 1678} 1679 1680void stream_init_constructors(ant_t *js) { 1681 ant_value_t events = 0; 1682 ant_value_t ee_ctor = 0; 1683 ant_value_t ee_proto = 0; 1684 1685 if (g_stream_ctor) return; 1686 1687 events = events_library(js); 1688 ee_ctor = js_get(js, events, "EventEmitter"); 1689 ee_proto = js_get(js, ee_ctor, "prototype"); 1690 1691 g_stream_proto = js_mkobj(js); 1692 js_set_proto_init(g_stream_proto, ee_proto); 1693 js_set(js, g_stream_proto, "pipe", js_mkfun(js_stream_pipe)); 1694 js_set(js, g_stream_proto, "unpipe", js_mkfun(js_stream_unpipe)); 1695 js_set(js, g_stream_proto, "pause", js_mkfun(js_stream_pause)); 1696 js_set(js, g_stream_proto, "resume", js_mkfun(js_stream_resume)); 1697 js_set(js, g_stream_proto, "isPaused", js_mkfun(js_stream_is_paused)); 1698 js_set(js, g_stream_proto, "destroy", js_mkfun(js_stream_destroy)); 1699 js_set_sym(js, g_stream_proto, get_toStringTag_sym(), js_mkstr(js, "Stream", 6)); 1700 g_stream_ctor = js_make_ctor(js, js_stream_ctor, g_stream_proto, "Stream", 6); 1701 1702 g_readable_proto = js_mkobj(js); 1703 js_set_proto_init(g_readable_proto, g_stream_proto); 1704 js_set(js, g_readable_proto, "_read", js_mkfun(js_readable__read)); 1705 js_set(js, g_readable_proto, "push", js_mkfun(js_readable_push)); 1706 js_set(js, g_readable_proto, "read", js_mkfun(js_readable_read)); 1707 js_set(js, g_readable_proto, "setEncoding", js_mkfun(js_readable_set_encoding)); 1708 js_set(js, g_readable_proto, "on", js_mkfun(js_readable_on)); 1709 js_set(js, g_readable_proto, "resume", js_mkfun(js_readable_resume)); 1710 js_set(js, g_readable_proto, "pause", js_mkfun(js_readable_pause)); 1711 js_set_sym(js, g_readable_proto, get_toStringTag_sym(), js_mkstr(js, "Readable", 8)); 1712 g_readable_ctor = js_make_ctor(js, js_readable_ctor, g_readable_proto, "Readable", 8); 1713 js_set(js, g_readable_ctor, "from", js_mkfun(js_readable_from)); 1714 js_set(js, g_readable_ctor, "fromWeb", js_mkfun(js_readable_from_web)); 1715 1716 g_writable_proto = js_mkobj(js); 1717 js_set_proto_init(g_writable_proto, g_stream_proto); 1718 js_set(js, g_writable_proto, "_write", js_mkfun(js_writable__write)); 1719 js_set(js, g_writable_proto, "_final", js_mkfun(js_writable__final)); 1720 js_set(js, g_writable_proto, "write", js_mkfun(js_writable_write)); 1721 js_set(js, g_writable_proto, "end", js_mkfun(js_writable_end)); 1722 js_set(js, g_writable_proto, "cork", js_mkfun(stream_noop)); 1723 js_set(js, g_writable_proto, "uncork", js_mkfun(stream_noop)); 1724 js_set_sym(js, g_writable_proto, get_toStringTag_sym(), js_mkstr(js, "Writable", 8)); 1725 g_writable_ctor = js_make_ctor(js, js_writable_ctor, g_writable_proto, "Writable", 8); 1726 1727 g_duplex_proto = js_mkobj(js); 1728 js_set_proto_init(g_duplex_proto, g_readable_proto); 1729 js_set(js, g_duplex_proto, "_write", js_mkfun(js_writable__write)); 1730 js_set(js, g_duplex_proto, "_final", js_mkfun(js_writable__final)); 1731 js_set(js, g_duplex_proto, "write", js_mkfun(js_writable_write)); 1732 js_set(js, g_duplex_proto, "end", js_mkfun(js_writable_end)); 1733 js_set(js, g_duplex_proto, "cork", js_mkfun(stream_noop)); 1734 js_set(js, g_duplex_proto, "uncork", js_mkfun(stream_noop)); 1735 js_set_sym(js, g_duplex_proto, get_toStringTag_sym(), js_mkstr(js, "Duplex", 6)); 1736 g_duplex_ctor = js_make_ctor(js, js_duplex_ctor, g_duplex_proto, "Duplex", 6); 1737 1738 g_transform_proto = js_mkobj(js); 1739 js_set_proto_init(g_transform_proto, g_duplex_proto); 1740 js_set(js, g_transform_proto, "_transform", js_mkfun(js_transform__transform)); 1741 js_set(js, g_transform_proto, "_write", js_mkfun(js_transform__write)); 1742 js_set(js, g_transform_proto, "_final", js_mkfun(js_transform__final)); 1743 js_set_sym(js, g_transform_proto, get_toStringTag_sym(), js_mkstr(js, "Transform", 9)); 1744 g_transform_ctor = js_make_ctor(js, js_transform_ctor, g_transform_proto, "Transform", 9); 1745 1746 g_passthrough_proto = js_mkobj(js); 1747 js_set_proto_init(g_passthrough_proto, g_transform_proto); 1748 js_set(js, g_passthrough_proto, "_transform", js_mkfun(js_passthrough__transform)); 1749 js_set_sym(js, g_passthrough_proto, get_toStringTag_sym(), js_mkstr(js, "PassThrough", 11)); 1750 g_passthrough_ctor = js_make_ctor(js, js_passthrough_ctor, g_passthrough_proto, "PassThrough", 11); 1751 1752 gc_register_root(&g_stream_proto); 1753 gc_register_root(&g_stream_ctor); 1754 gc_register_root(&g_readable_proto); 1755 gc_register_root(&g_readable_ctor); 1756 gc_register_root(&g_writable_proto); 1757 gc_register_root(&g_writable_ctor); 1758 gc_register_root(&g_duplex_proto); 1759 gc_register_root(&g_duplex_ctor); 1760 gc_register_root(&g_transform_proto); 1761 gc_register_root(&g_transform_ctor); 1762 gc_register_root(&g_passthrough_proto); 1763 gc_register_root(&g_passthrough_ctor); 1764} 1765 1766ant_value_t stream_readable_constructor(ant_t *js) { 1767 stream_init_constructors(js); 1768 return g_readable_ctor; 1769} 1770 1771ant_value_t stream_writable_constructor(ant_t *js) { 1772 stream_init_constructors(js); 1773 return g_writable_ctor; 1774} 1775 1776ant_value_t stream_readable_prototype(ant_t *js) { 1777 stream_init_constructors(js); 1778 return g_readable_proto; 1779} 1780 1781ant_value_t stream_writable_prototype(ant_t *js) { 1782 stream_init_constructors(js); 1783 return g_writable_proto; 1784} 1785 1786ant_value_t stream_duplex_prototype(ant_t *js) { 1787 stream_init_constructors(js); 1788 return g_duplex_proto; 1789} 1790 1791ant_value_t stream_construct_readable(ant_t *js, ant_value_t base_proto, ant_value_t options) { 1792 stream_init_constructors(js); 1793 return stream_construct(js, base_proto, options, stream_init_readable); 1794} 1795 1796ant_value_t stream_construct_writable(ant_t *js, ant_value_t base_proto, ant_value_t options) { 1797 stream_init_constructors(js); 1798 return stream_construct(js, base_proto, options, stream_init_writable); 1799} 1800 1801void stream_init_readable_object(ant_t *js, ant_value_t obj, ant_value_t options) { 1802 stream_init_constructors(js); 1803 if (!is_object_type(obj)) return; 1804 js_set_native_tag(obj, STREAM_NATIVE_TAG); 1805 stream_init_readable(js, obj, options); 1806} 1807 1808void stream_init_writable_object(ant_t *js, ant_value_t obj, ant_value_t options) { 1809 stream_init_constructors(js); 1810 if (!is_object_type(obj)) return; 1811 js_set_native_tag(obj, STREAM_NATIVE_TAG); 1812 stream_init_writable(js, obj, options); 1813} 1814 1815void stream_init_duplex_object(ant_t *js, ant_value_t obj, ant_value_t options) { 1816 stream_init_constructors(js); 1817 if (!is_object_type(obj)) return; 1818 js_set_native_tag(obj, STREAM_NATIVE_TAG); 1819 stream_init_readable(js, obj, options); 1820 stream_init_writable(js, obj, options); 1821} 1822 1823ant_value_t stream_readable_push(ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding) { 1824 stream_init_constructors(js); 1825 return stream_readable_push_value(js, stream_obj, chunk, encoding); 1826} 1827 1828static ant_value_t js_stream_get_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) { 1829 bool object_mode = nargs > 0 && js_truthy(js, args[0]); 1830 return js_mknum(stream_default_high_water_mark(object_mode)); 1831} 1832 1833static ant_value_t js_stream_set_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) { 1834 if (nargs < 2 || vtype(args[1]) != T_NUM || js_getnum(args[1]) < 0) 1835 return js_mkerr_typed(js, JS_ERR_RANGE, "setDefaultHighWaterMark requires a non-negative number"); 1836 1837 bool object_mode = js_truthy(js, args[0]); 1838 if (object_mode) g_default_object_high_water_mark = js_getnum(args[1]); 1839 else g_default_high_water_mark = js_getnum(args[1]); 1840 1841 return js_mkundef(); 1842} 1843 1844static void stream_web_copy_global(ant_t *js, ant_value_t obj, const char *name) { 1845 ant_value_t value = js_get(js, js->global, name); 1846 if (is_err(value)) return; 1847 js_set(js, obj, name, value); 1848} 1849 1850// TODO: remove copy-on-start 1851static void stream_web_define_common(ant_t *js, ant_value_t obj) { 1852 stream_web_copy_global(js, obj, "ReadableStream"); 1853 stream_web_copy_global(js, obj, "ReadableStreamDefaultReader"); 1854 stream_web_copy_global(js, obj, "ReadableStreamDefaultController"); 1855 stream_web_copy_global(js, obj, "WritableStream"); 1856 stream_web_copy_global(js, obj, "WritableStreamDefaultWriter"); 1857 stream_web_copy_global(js, obj, "WritableStreamDefaultController"); 1858 stream_web_copy_global(js, obj, "TransformStream"); 1859 stream_web_copy_global(js, obj, "TransformStreamDefaultController"); 1860 stream_web_copy_global(js, obj, "ByteLengthQueuingStrategy"); 1861 stream_web_copy_global(js, obj, "CountQueuingStrategy"); 1862 stream_web_copy_global(js, obj, "TextEncoderStream"); 1863 stream_web_copy_global(js, obj, "TextDecoderStream"); 1864 stream_web_copy_global(js, obj, "CompressionStream"); 1865 stream_web_copy_global(js, obj, "DecompressionStream"); 1866} 1867 1868ant_value_t stream_library(ant_t *js) { 1869 ant_value_t lib = js_mkobj(js); 1870 ant_value_t promises = js_mkobj(js); 1871 stream_init_constructors(js); 1872 1873 js_set(js, promises, "pipeline", js_mkfun(js_stream_promises_pipeline)); 1874 js_set(js, promises, "finished", js_mkfun(js_stream_promises_finished)); 1875 1876 js_set_module_default(js, lib, g_stream_ctor, "Stream"); 1877 js_set(js, lib, "Readable", g_readable_ctor); 1878 js_set(js, lib, "Writable", g_writable_ctor); 1879 js_set(js, lib, "Duplex", g_duplex_ctor); 1880 js_set(js, lib, "Transform", g_transform_ctor); 1881 js_set(js, lib, "PassThrough", g_passthrough_ctor); 1882 js_set(js, lib, "pipeline", js_mkfun(js_stream_pipeline)); 1883 js_set(js, lib, "finished", js_mkfun(js_stream_finished)); 1884 js_set(js, lib, "getDefaultHighWaterMark", js_mkfun(js_stream_get_default_high_water_mark)); 1885 js_set(js, lib, "setDefaultHighWaterMark", js_mkfun(js_stream_set_default_high_water_mark)); 1886 js_set(js, lib, "promises", promises); 1887 1888 js_set(js, g_stream_ctor, "Readable", g_readable_ctor); 1889 js_set(js, g_stream_ctor, "Writable", g_writable_ctor); 1890 js_set(js, g_stream_ctor, "Duplex", g_duplex_ctor); 1891 js_set(js, g_stream_ctor, "Transform", g_transform_ctor); 1892 js_set(js, g_stream_ctor, "PassThrough", g_passthrough_ctor); 1893 js_set(js, g_stream_ctor, "pipeline", js_get(js, lib, "pipeline")); 1894 js_set(js, g_stream_ctor, "finished", js_get(js, lib, "finished")); 1895 js_set(js, g_stream_ctor, "getDefaultHighWaterMark", js_get(js, lib, "getDefaultHighWaterMark")); 1896 js_set(js, g_stream_ctor, "setDefaultHighWaterMark", js_get(js, lib, "setDefaultHighWaterMark")); 1897 js_set(js, g_stream_ctor, "promises", promises); 1898 1899 js_set(js, promises, "default", promises); 1900 js_set_slot_wb(js, promises, SLOT_DEFAULT, promises); 1901 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream", 6)); 1902 1903 return lib; 1904} 1905 1906ant_value_t stream_promises_library(ant_t *js) { 1907 ant_value_t stream_ns = js_esm_import_sync_cstr(js, "stream", 6); 1908 ant_value_t promises = 0; 1909 1910 if (is_err(stream_ns)) return stream_ns; 1911 promises = js_get(js, stream_ns, "promises"); 1912 1913 if (!is_object_type(promises)) return js_mkerr(js, "stream.promises is not available"); 1914 js_set(js, promises, "default", promises); 1915 js_set_slot_wb(js, promises, SLOT_DEFAULT, promises); 1916 1917 return promises; 1918} 1919 1920ant_value_t stream_web_library(ant_t *js) { 1921 ant_value_t lib = js_mkobj(js); 1922 1923 stream_web_define_common(js, lib); 1924 js_set(js, lib, "default", lib); 1925 js_set_slot_wb(js, lib, SLOT_DEFAULT, lib); 1926 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream/web", 10)); 1927 1928 return lib; 1929}