#include #include #include "ant.h" #include "ptr.h" #include "internal.h" #include "silver/engine.h" #include "esm/loader.h" #include "gc/roots.h" #include "modules/assert.h" #include "modules/buffer.h" #include "modules/events.h" #include "modules/stream.h" #include "modules/symbol.h" #include "modules/string_decoder.h" enum { STREAM_NATIVE_TAG = 0x5354524Du }; // STRM static ant_value_t g_stream_proto = 0; static ant_value_t g_stream_ctor = 0; static ant_value_t g_readable_proto = 0; static ant_value_t g_readable_ctor = 0; static ant_value_t g_writable_proto = 0; static ant_value_t g_writable_ctor = 0; static ant_value_t g_duplex_proto = 0; static ant_value_t g_duplex_ctor = 0; static ant_value_t g_transform_proto = 0; static ant_value_t g_transform_ctor = 0; static ant_value_t g_passthrough_proto = 0; static ant_value_t g_passthrough_ctor = 0; static double g_default_high_water_mark = 16384.0; static double g_default_object_high_water_mark = 16.0; static ant_value_t stream_noop(ant_t *js, ant_value_t *args, int nargs) { return js_mkundef(); } static bool stream_is_instance(ant_value_t value) { return is_object_type(value) && js_check_native_tag(value, STREAM_NATIVE_TAG); } static inline void stream_set_end_callback(ant_t *js, ant_value_t stream_obj, ant_value_t callback) { js_set_slot_wb(js, stream_obj, SLOT_AUX, callback); } static stream_private_state_t *stream_private_state(ant_value_t stream_obj) { if (!stream_is_instance(stream_obj)) return NULL; return (stream_private_state_t *)js_get_native_ptr(stream_obj); } static ant_value_t stream_require_this(ant_t *js, ant_value_t value, const char *label) { if (!stream_is_instance(value)) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid %s", label); return value; } static ant_value_t stream_truthy_or_object(ant_t *js, ant_value_t value) { return js_truthy(js, value) ? value : js_mkobj(js); } static ant_value_t stream_readable_state(ant_t *js, ant_value_t stream_obj) { return js_get(js, stream_obj, "_readableState"); } static ant_value_t stream_writable_state(ant_t *js, ant_value_t stream_obj) { return js_get(js, stream_obj, "_writableState"); } static ant_value_t stream_pipes(ant_t *js, ant_value_t stream_obj) { return js_get(js, stream_obj, "_pipes"); } static bool stream_key_is_cstr(ant_t *js, ant_value_t value, const char *expected) { size_t len = 0; const char *s = NULL; if (vtype(value) != T_STR) return false; s = js_getstr(js, value, &len); return s && len == strlen(expected) && memcmp(s, expected, len) == 0; } static ant_value_t stream_event_key(ant_t *js, ant_value_t value) { uint8_t t = vtype(value); if (t == T_STR || t == T_SYMBOL) return value; return js_mkerr(js, "event must be a string or Symbol"); } void *stream_get_attached_state(ant_value_t stream_obj) { stream_private_state_t *priv = stream_private_state(stream_obj); return priv ? priv->attached_state : NULL; } void stream_set_attached_state( ant_value_t stream_obj, void *state, stream_finalize_fn finalize ) { stream_private_state_t *priv = stream_private_state(stream_obj); if (!priv) return; priv->attached_state = state; priv->attached_state_finalize = finalize; } void stream_clear_attached_state(ant_value_t stream_obj) { stream_private_state_t *priv = stream_private_state(stream_obj); if (!priv) return; priv->attached_state = NULL; priv->attached_state_finalize = NULL; } static void stream_finalize(ant_t *js, ant_object_t *obj) { ant_value_t stream_obj = js_obj_from_ptr(obj); stream_private_state_t *priv = stream_private_state(stream_obj); if (!priv) return; js_set_native_ptr(stream_obj, NULL); if (priv->attached_state && priv->attached_state_finalize) priv->attached_state_finalize(js, stream_obj, priv->attached_state); free(priv); } static ant_value_t stream_call( ant_t *js, ant_value_t fn, ant_value_t this_val, ant_value_t *args, int nargs, bool is_ctor ) { if (!is_callable(fn)) return js_mkundef(); if (sv_check_c_stack_overflow(js)) return js_mkerr_typed(js, JS_ERR_RANGE | JS_ERR_NO_STACK, "Maximum call stack size exceeded"); sv_call_mode_t mode = is_ctor ? SV_CALL_MODE_CONSTRUCT : SV_CALL_MODE_NORMAL; sv_call_plan_t plan; ant_value_t err = sv_prepare_call(js->vm, js, fn, this_val, args, nargs, NULL, mode, &plan); if (is_err(err)) return err; return sv_execute_call_plan(js->vm, js, &plan, NULL); } static ant_value_t stream_call_prop( ant_t *js, ant_value_t target, const char *name, ant_value_t *args, int nargs ) { ant_value_t fn = js_getprop_fallback(js, target, name); if (is_err(fn) || !is_callable(fn)) return js_mkundef(); return stream_call(js, fn, target, args, nargs, false); } static void stream_call_callback(ant_t *js, ant_value_t fn, ant_value_t *args, int nargs) { if (!is_callable(fn)) return; stream_call(js, fn, js_mkundef(), args, nargs, false); } static void stream_schedule_microtask(ant_t *js, ant_cfunc_t fn, ant_value_t data) { ant_value_t promise = js_mkpromise(js); ant_value_t cb = js_heavy_mkfun(js, fn, data); ant_value_t then_result = 0; js_resolve_promise(js, promise, js_mkundef()); then_result = js_promise_then(js, promise, cb, js_mkundef()); promise_mark_handled(then_result); } static ant_value_t stream_buffer_ctor(ant_t *js) { ant_value_t ns = js_esm_import_sync_cstr(js, "buffer", 6); if (is_err(ns)) return ns; return js_get(js, ns, "Buffer"); } static ant_value_t stream_readable_decoder(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return js_mkundef(); return js_get(js, state, "decoder"); } static ant_value_t stream_readable_decode_chunk( ant_t *js, ant_value_t stream_obj, ant_value_t chunk, bool flush ) { ant_value_t decoder = stream_readable_decoder(js, stream_obj); if (!is_object_type(decoder)) return chunk; return string_decoder_decode_value(js, decoder, chunk, flush); } static bool stream_value_is_empty_string(ant_t *js, ant_value_t value) { size_t len = 0; if (vtype(value) != T_STR) return false; (void)js_getstr(js, value, &len); return len == 0; } static ant_value_t stream_make_buffer(ant_t *js, ant_value_t value, ant_value_t encoding) { ant_value_t buffer_ctor = stream_buffer_ctor(js); ant_value_t from_fn = 0; ant_value_t args[2]; if (is_err(buffer_ctor)) return buffer_ctor; from_fn = js_get(js, buffer_ctor, "from"); if (is_err(from_fn) || !is_callable(from_fn)) return js_mkerr(js, "Buffer.from is not available"); args[0] = value; args[1] = encoding; return stream_call(js, from_fn, buffer_ctor, args, 2, false); } static ant_value_t stream_normalize_chunk( ant_t *js, ant_value_t chunk, bool object_mode, ant_value_t encoding ) { ant_value_t str_val = 0; if ( object_mode || is_null(chunk) || is_undefined(chunk) || vtype(chunk) == T_TYPEDARRAY || buffer_is_binary_source(chunk) ) return chunk; if (vtype(chunk) == T_STR) return stream_make_buffer(js, chunk, encoding); str_val = js_tostring_val(js, chunk); if (is_err(str_val)) return str_val; return stream_make_buffer(js, str_val, encoding); } static ant_value_t stream_readable_buffer(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return js_mkundef(); return js_get(js, state, "buffer"); } static ant_offset_t stream_readable_buffer_head(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); ant_value_t head = is_object_type(state) ? js_get(js, state, "bufferHead") : js_mkundef(); return vtype(head) == T_NUM ? (ant_offset_t)js_getnum(head) : 0; } static void stream_set_readable_buffer_head(ant_t *js, ant_value_t stream_obj, ant_offset_t head) { ant_value_t state = stream_readable_state(js, stream_obj); if (is_object_type(state)) js_set(js, state, "bufferHead", js_mknum((double)head)); } static ant_offset_t stream_readable_buffer_len(ant_t *js, ant_value_t stream_obj) { ant_value_t buffer = stream_readable_buffer(js, stream_obj); ant_offset_t head = stream_readable_buffer_head(js, stream_obj); ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0; return len > head ? len - head : 0; } static void stream_compact_readable_buffer(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); ant_value_t buffer = stream_readable_buffer(js, stream_obj); ant_offset_t head = stream_readable_buffer_head(js, stream_obj); ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0; ant_value_t compact = 0; if (!is_object_type(state) || vtype(buffer) != T_ARR) return; if (head == 0) return; if (head >= len) { compact = js_mkarr(js); js_set(js, state, "buffer", compact); js_set(js, state, "bufferHead", js_mknum(0)); return; } if (head <= 32 && head * 2 < len) return; compact = js_mkarr(js); for (ant_offset_t i = head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, buffer, i)); js_set(js, state, "buffer", compact); js_set(js, state, "bufferHead", js_mknum(0)); } static void stream_buffer_push(ant_t *js, ant_value_t stream_obj, ant_value_t value) { ant_value_t state = stream_readable_state(js, stream_obj); ant_value_t buffer = stream_readable_buffer(js, stream_obj); if (!is_object_type(state) || vtype(buffer) != T_ARR) return; js_arr_push(js, buffer, value); } static ant_value_t stream_buffer_shift(ant_t *js, ant_value_t stream_obj) { ant_value_t buffer = stream_readable_buffer(js, stream_obj); ant_offset_t head = stream_readable_buffer_head(js, stream_obj); ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0; ant_value_t value = js_mkundef(); if (vtype(buffer) != T_ARR || head >= len) return js_mkundef(); value = js_arr_get(js, buffer, head); stream_set_readable_buffer_head(js, stream_obj, head + 1); stream_compact_readable_buffer(js, stream_obj); return value; } static bool stream_listener_count_positive(ant_t *js, ant_value_t target, const char *event_name) { ant_value_t args[1]; ant_value_t result = 0; args[0] = js_mkstr(js, event_name, strlen(event_name)); result = stream_call_prop(js, target, "listenerCount", args, 1); return vtype(result) == T_NUM && js_getnum(result) > 0; } static void stream_remove_listener( ant_t *js, ant_value_t target, const char *event_name, ant_value_t listener ) { ant_value_t args[2]; args[0] = js_mkstr(js, event_name, strlen(event_name)); args[1] = listener; stream_call_prop(js, target, "removeListener", args, 2); } static ant_value_t stream_get_option(ant_t *js, ant_value_t options, const char *name) { if (!is_object_type(options)) return js_mkundef(); return js_get(js, options, name); } static double stream_default_high_water_mark(bool object_mode) { return object_mode ? g_default_object_high_water_mark : g_default_high_water_mark; } static double stream_high_water_mark_from_options(ant_t *js, ant_value_t options, bool object_mode) { ant_value_t hwm = stream_get_option(js, options, "highWaterMark"); return (vtype(hwm) == T_NUM && js_getnum(hwm) > 0) ? js_getnum(hwm) : stream_default_high_water_mark(object_mode); } static ant_value_t stream_make_base_object(ant_t *js, ant_value_t proto) { ant_value_t obj = js_mkobj(js); stream_private_state_t *priv = calloc(1, sizeof(*priv)); if (is_object_type(proto)) js_set_proto_init(obj, proto); js_set_native_tag(obj, STREAM_NATIVE_TAG); if (priv) js_set_native_ptr(obj, priv); js_set_slot(obj, SLOT_AUX, js_mkundef()); js_set_finalizer(obj, stream_finalize); return obj; } static void stream_init_base(ant_t *js, ant_value_t obj, ant_value_t raw_options) { ant_value_t pipes = js_mkarr(js); js_set(js, obj, "readable", js_true); js_set(js, obj, "writable", js_true); js_set(js, obj, "destroyed", js_false); js_set(js, obj, "_paused", js_false); js_set(js, obj, "_pipes", pipes); js_set(js, obj, "_streamOptions", stream_truthy_or_object(js, raw_options)); } static void stream_init_readable(ant_t *js, ant_value_t obj, ant_value_t raw_options) { ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js); ant_value_t state = js_mkobj(js); ant_value_t read_fn = stream_get_option(js, options, "read"); bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode")); double high_water_mark = stream_high_water_mark_from_options(js, options, object_mode); stream_init_base(js, obj, raw_options); js_set(js, obj, "readable", js_true); js_set(js, obj, "writable", js_false); js_set(js, obj, "readableEnded", js_false); js_set(js, state, "objectMode", js_bool(object_mode)); js_set(js, state, "ended", js_false); js_set(js, state, "endEmitted", js_false); js_set(js, state, "flowing", js_false); js_set(js, state, "flowingReadScheduled", js_false); js_set(js, state, "reading", js_false); js_set(js, state, "highWaterMark", js_mknum(high_water_mark)); js_set(js, state, "buffer", js_mkarr(js)); js_set(js, state, "bufferHead", js_mknum(0)); js_set(js, obj, "_readableState", state); if (is_callable(read_fn)) js_set(js, obj, "_read", read_fn); } static void stream_init_writable(ant_t *js, ant_value_t obj, ant_value_t raw_options) { ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js); ant_value_t state = js_mkobj(js); bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode")) || js_truthy(js, stream_get_option(js, options, "writableObjectMode")); ant_value_t write_fn = stream_get_option(js, options, "write"); ant_value_t final_fn = stream_get_option(js, options, "final"); ant_value_t destroy_fn = stream_get_option(js, options, "destroy"); stream_init_base(js, obj, raw_options); js_set(js, obj, "readable", js_false); js_set(js, obj, "writable", js_true); js_set(js, obj, "writableEnded", js_false); js_set(js, obj, "writableFinished", js_false); js_set(js, state, "objectMode", js_bool(object_mode)); js_set(js, state, "finished", js_false); js_set(js, state, "ended", js_false); js_set(js, obj, "_writableState", state); if (is_callable(write_fn)) js_set(js, obj, "_write", write_fn); if (is_callable(final_fn)) js_set(js, obj, "_final", final_fn); if (is_callable(destroy_fn)) js_set(js, obj, "_destroy", destroy_fn); } static ant_value_t stream_construct( ant_t *js, ant_value_t base_proto, ant_value_t raw_options, void (*init_fn)(ant_t *, ant_value_t, ant_value_t) ) { ant_value_t proto = js_instance_proto_from_new_target(js, base_proto); ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : base_proto); init_fn(js, obj, raw_options); return obj; } static ant_value_t stream_emit_named(ant_t *js, ant_value_t stream_obj, const char *event_name) { return js_bool(eventemitter_emit_args(js, stream_obj, event_name, NULL, 0)); } static void stream_emit_error(ant_t *js, ant_value_t stream_obj, ant_value_t error) { ant_value_t args[1]; args[0] = error; eventemitter_emit_args(js, stream_obj, "error", args, 1); } static void stream_readable_schedule_continue_flowing(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return; if (!js_truthy(js, js_get(js, state, "flowing"))) return; if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return; if (js_truthy(js, js_get(js, state, "ended"))) return; if (stream_readable_buffer_len(js, stream_obj) > 0) return; if (js_truthy(js, js_get(js, state, "flowingReadScheduled"))) return; js_set(js, state, "flowingReadScheduled", js_true); stream_schedule_microtask(js, stream_readable_continue_flowing, stream_obj); } static ant_value_t js_stream_pause(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); if (is_err(stream_obj)) return stream_obj; js_set(js, stream_obj, "_paused", js_true); stream_emit_named(js, stream_obj, "pause"); return stream_obj; } static ant_value_t js_stream_resume(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); if (is_err(stream_obj)) return stream_obj; js_set(js, stream_obj, "_paused", js_false); stream_emit_named(js, stream_obj, "resume"); return stream_obj; } static ant_value_t js_stream_is_paused(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); ant_value_t paused = 0; if (is_err(stream_obj)) return stream_obj; paused = js_get(js, stream_obj, "_paused"); return js_bool(js_truthy(js, paused)); } static void stream_pipe_remove_state(ant_t *js, ant_value_t source, ant_value_t state_obj) { ant_value_t pipes = stream_pipes(js, source); ant_offset_t len = vtype(pipes) == T_ARR ? js_arr_len(js, pipes) : 0; ant_value_t next = js_mkarr(js); for (ant_offset_t i = 0; i < len; i++) { ant_value_t item = js_arr_get(js, pipes, i); if (item != state_obj) js_arr_push(js, next, item); } js_set(js, source, "_pipes", next); } static void stream_pipe_cleanup(ant_t *js, ant_value_t state_obj) { ant_value_t cleaned = js_get(js, state_obj, "cleaned"); ant_value_t source = js_get(js, state_obj, "source"); ant_value_t dest = js_get(js, state_obj, "dest"); ant_value_t on_data = js_get(js, state_obj, "onData"); ant_value_t on_drain = js_get(js, state_obj, "onDrain"); ant_value_t on_end = js_get(js, state_obj, "onEnd"); ant_value_t on_close = js_get(js, state_obj, "onClose"); ant_value_t on_error = js_get(js, state_obj, "onError"); if (js_truthy(js, cleaned)) return; js_set(js, state_obj, "cleaned", js_true); if (stream_is_instance(source)) { stream_remove_listener(js, source, "data", on_data); stream_remove_listener(js, source, "end", on_end); stream_remove_listener(js, source, "close", on_close); stream_remove_listener(js, source, "error", on_error); stream_pipe_remove_state(js, source, state_obj); } if (is_object_type(dest)) stream_remove_listener(js, dest, "drain", on_drain); } static ant_value_t stream_pipe_on_data(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t source = js_get(js, state_obj, "source"); ant_value_t dest = js_get(js, state_obj, "dest"); ant_value_t result = js_mkundef(); if (!is_object_type(dest)) return js_mkundef(); result = stream_call_prop(js, dest, "write", nargs > 0 ? &args[0] : NULL, nargs > 0 ? 1 : 0); if (is_err(result)) return result; if (result == js_false) stream_call_prop(js, source, "pause", NULL, 0); return js_mkundef(); } static ant_value_t stream_pipe_on_drain(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t source = js_get(js, state_obj, "source"); stream_call_prop(js, source, "resume", NULL, 0); return js_mkundef(); } static ant_value_t stream_pipe_on_end(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t dest = js_get(js, state_obj, "dest"); bool end_dest = js_truthy(js, js_get(js, state_obj, "end")); stream_pipe_cleanup(js, state_obj); if (end_dest) stream_call_prop(js, dest, "end", NULL, 0); return js_mkundef(); } static ant_value_t stream_pipe_on_close(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); stream_pipe_cleanup(js, state_obj); return js_mkundef(); } static ant_value_t stream_pipe_on_error(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t dest = js_get(js, state_obj, "dest"); stream_pipe_cleanup(js, state_obj); if (is_object_type(dest) && stream_listener_count_positive(js, dest, "error") && nargs > 0) eventemitter_emit_args(js, dest, "error", &args[0], 1); return js_mkundef(); } static ant_value_t js_stream_pipe(ant_t *js, ant_value_t *args, int nargs) { ant_value_t source = stream_require_this(js, js_getthis(js), "stream"); ant_value_t options = nargs > 1 ? args[1] : js_mkundef(); ant_value_t state_obj = 0; ant_value_t readable_state = 0; bool end_dest = true; if (is_err(source)) return source; if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "pipe requires a destination stream"); if (is_object_type(options)) { ant_value_t end_val = js_get(js, options, "end"); if (!is_undefined(end_val)) end_dest = end_val != js_false; } state_obj = js_mkobj(js); js_set(js, state_obj, "source", source); js_set(js, state_obj, "dest", args[0]); js_set(js, state_obj, "end", js_bool(end_dest)); js_set(js, state_obj, "cleaned", js_false); js_set(js, state_obj, "onData", js_heavy_mkfun(js, stream_pipe_on_data, state_obj)); js_set(js, state_obj, "onDrain", js_heavy_mkfun(js, stream_pipe_on_drain, state_obj)); js_set(js, state_obj, "onEnd", js_heavy_mkfun(js, stream_pipe_on_end, state_obj)); js_set(js, state_obj, "onClose", js_heavy_mkfun(js, stream_pipe_on_close, state_obj)); js_set(js, state_obj, "onError", js_heavy_mkfun(js, stream_pipe_on_error, state_obj)); js_arr_push(js, stream_pipes(js, source), state_obj); eventemitter_add_listener(js, source, "data", js_get(js, state_obj, "onData"), false); eventemitter_add_listener(js, source, "end", js_get(js, state_obj, "onEnd"), true); eventemitter_add_listener(js, source, "close", js_get(js, state_obj, "onClose"), true); eventemitter_add_listener(js, source, "error", js_get(js, state_obj, "onError"), false); eventemitter_add_listener(js, args[0], "drain", js_get(js, state_obj, "onDrain"), false); eventemitter_emit_args(js, args[0], "pipe", &source, 1); readable_state = stream_readable_state(js, source); if (is_object_type(readable_state)) js_set(js, readable_state, "flowing", js_true); stream_call_prop(js, source, "resume", NULL, 0); return args[0]; } static ant_value_t js_stream_unpipe(ant_t *js, ant_value_t *args, int nargs) { ant_value_t source = stream_require_this(js, js_getthis(js), "stream"); ant_value_t pipes = 0; ant_value_t matches = 0; ant_offset_t len = 0; ant_value_t dest = nargs > 0 ? args[0] : js_mkundef(); if (is_err(source)) return source; pipes = stream_pipes(js, source); if (vtype(pipes) != T_ARR) return source; matches = js_mkarr(js); len = js_arr_len(js, pipes); for (ant_offset_t i = 0; i < len; i++) { ant_value_t state_obj = js_arr_get(js, pipes, i); ant_value_t entry_dest = js_get(js, state_obj, "dest"); if (!is_object_type(dest) || entry_dest == dest) js_arr_push(js, matches, state_obj); } len = js_arr_len(js, matches); for (ant_offset_t i = 0; i < len; i++) stream_pipe_cleanup(js, js_arr_get(js, matches, i)); return source; } static ant_value_t stream_destroy_done(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t stream_obj = js_get(js, state_obj, "stream"); ant_value_t destroyed_err = (nargs > 0) ? args[0] : js_mkundef(); if (!is_null(destroyed_err) && !is_undefined(destroyed_err)) stream_emit_error(js, stream_obj, destroyed_err); stream_emit_named(js, stream_obj, "close"); return js_mkundef(); } static ant_value_t stream_once_call(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t fn = js_get(js, state_obj, "fn"); ant_value_t this_val = js_get(js, state_obj, "thisVal"); ant_value_t called = js_get(js, state_obj, "called"); if (js_truthy(js, called)) return js_mkundef(); js_set(js, state_obj, "called", js_true); return stream_call(js, fn, this_val, args, nargs, false); } static ant_value_t stream_make_once(ant_t *js, ant_value_t fn, ant_value_t this_val) { ant_value_t state_obj = js_mkobj(js); js_set(js, state_obj, "fn", fn); js_set(js, state_obj, "thisVal", this_val); js_set(js, state_obj, "called", js_false); return js_heavy_mkfun(js, stream_once_call, state_obj); } static ant_value_t js_stream_destroy(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); ant_value_t destroy_fn = 0; ant_value_t done_state = 0; ant_value_t done = 0; ant_value_t destroy_args[2]; ant_value_t error = nargs > 0 ? args[0] : js_mkundef(); ant_value_t result = 0; if (is_err(stream_obj)) return stream_obj; if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return stream_obj; js_set(js, stream_obj, "destroyed", js_true); done_state = js_mkobj(js); js_set(js, done_state, "stream", stream_obj); done = stream_make_once(js, js_heavy_mkfun(js, stream_destroy_done, done_state), js_mkundef()); destroy_fn = js_getprop_fallback(js, stream_obj, "_destroy"); if (is_callable(destroy_fn)) { destroy_args[0] = is_undefined(error) ? js_mknull() : error; destroy_args[1] = done; result = stream_call(js, destroy_fn, stream_obj, destroy_args, 2, false); return is_err(result) ? result : stream_obj; } destroy_args[0] = is_undefined(error) ? js_mknull() : error; stream_call_callback(js, done, destroy_args, 1); return stream_obj; } static ant_value_t js_readable__read(ant_t *js, ant_value_t *args, int nargs) { return js_mkundef(); } static ant_value_t stream_readable_start_flowing(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); return stream_readable_begin_flowing(js, stream_obj); } ant_value_t stream_readable_continue_flowing(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return js_mkundef(); js_set(js, state, "flowingReadScheduled", js_false); if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef(); if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef(); if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef(); stream_readable_maybe_read(js, stream_obj); stream_readable_flush(js, stream_obj); return js_mkundef(); } ant_value_t stream_readable_begin_flowing(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return js_mkundef(); if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef(); { ant_value_t saved_this = js->this_val; js->this_val = stream_obj; js_stream_resume(js, NULL, 0); js->this_val = saved_this; } stream_readable_maybe_read(js, stream_obj); stream_readable_flush(js, stream_obj); return js_mkundef(); } ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); bool emitted_data = false; if (!is_object_type(state)) return js_mkundef(); while (js_truthy(js, js_get(js, state, "flowing")) && stream_readable_buffer_len(js, stream_obj) > 0) { ant_value_t chunk = stream_buffer_shift(js, stream_obj); chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false); if (is_err(chunk)) return chunk; emitted_data = true; eventemitter_emit_args(js, stream_obj, "data", &chunk, 1); } if ( js_truthy(js, js_get(js, state, "ended")) && stream_readable_buffer_len(js, stream_obj) == 0 && !js_truthy(js, js_get(js, state, "endEmitted")) ) { ant_value_t tail = stream_readable_decode_chunk(js, stream_obj, js_mkundef(), true); if (is_err(tail)) return tail; if (!is_undefined(tail) && !stream_value_is_empty_string(js, tail)) { emitted_data = true; eventemitter_emit_args(js, stream_obj, "data", &tail, 1); } js_set(js, state, "endEmitted", js_true); js_set(js, stream_obj, "readableEnded", js_true); stream_emit_named(js, stream_obj, "end"); stream_emit_named(js, stream_obj, "close"); } else if (emitted_data) stream_readable_schedule_continue_flowing(js, stream_obj); return js_mkundef(); } ant_value_t stream_readable_maybe_read(ant_t *js, ant_value_t stream_obj) { ant_value_t state = stream_readable_state(js, stream_obj); ant_value_t read_fn = 0; ant_value_t args[1]; if (!is_object_type(state)) return js_mkundef(); if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef(); if (js_truthy(js, js_get(js, state, "reading"))) return js_mkundef(); if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef(); if (stream_readable_buffer_len(js, stream_obj) > 0) return js_mkundef(); read_fn = js_getprop_fallback(js, stream_obj, "_read"); js_set(js, state, "reading", js_true); args[0] = js_get(js, state, "highWaterMark"); if (is_callable(read_fn)) stream_call(js, read_fn, stream_obj, args, 1, false); js_set(js, state, "reading", js_false); return js_mkundef(); } ant_value_t stream_readable_push_value( ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding ) { ant_value_t state = stream_readable_state(js, stream_obj); ant_value_t normalized = 0; if (!is_object_type(state)) return js_false; if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_false; if (is_null(chunk)) { js_set(js, state, "ended", js_true); stream_readable_flush(js, stream_obj); return js_false; } normalized = stream_normalize_chunk( js, chunk, js_truthy(js, js_get(js, state, "objectMode")), is_undefined(encoding) ? js_mkstr(js, "utf8", 4) : encoding ); if (is_err(normalized)) return normalized; stream_buffer_push(js, stream_obj, normalized); if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj); return js_bool(js_truthy(js, js_get(js, state, "flowing"))); } static ant_value_t js_readable_push(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); ant_value_t chunk = nargs > 0 ? args[0] : js_mkundef(); ant_value_t encoding = nargs > 1 ? args[1] : js_mkundef(); if (is_err(stream_obj)) return stream_obj; return stream_readable_push_value(js, stream_obj, chunk, encoding); } static ant_value_t js_readable_read(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); ant_value_t state = 0; ant_value_t chunk = 0; if (is_err(stream_obj)) return stream_obj; state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return js_mknull(); if (stream_readable_buffer_len(js, stream_obj) == 0) stream_readable_maybe_read(js, stream_obj); if (stream_readable_buffer_len(js, stream_obj) == 0) return js_mknull(); chunk = stream_buffer_shift(js, stream_obj); chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false); if (is_err(chunk)) return chunk; if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj); return chunk; } static ant_value_t js_readable_set_encoding(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); ant_value_t state = 0; ant_value_t decoder = 0; ant_value_t encoding = nargs > 0 && !is_undefined(args[0]) ? args[0] : js_mkstr(js, "utf8", 4); ant_value_t encoding_str = 0; if (is_err(stream_obj)) return stream_obj; state = stream_readable_state(js, stream_obj); if (!is_object_type(state)) return stream_obj; decoder = string_decoder_create(js, encoding); if (is_err(decoder)) return decoder; encoding_str = js_tostring_val(js, encoding); if (is_err(encoding_str)) return encoding_str; js_set(js, state, "decoder", decoder); js_set(js, stream_obj, "encoding", encoding_str); js_set(js, stream_obj, "readableEncoding", encoding_str); return stream_obj; } static ant_value_t js_readable_on(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); ant_value_t key = 0; ant_value_t state = 0; if (is_err(stream_obj)) return stream_obj; if (nargs < 2) return js_mkerr(js, "on requires 2 arguments (event, listener)"); key = stream_event_key(js, args[0]); if (is_err(key)) return key; if (!eventemitter_add_listener_val(js, stream_obj, key, args[1], false)) return js_mkerr(js, "listener must be a function"); if (stream_key_is_cstr(js, key, "data")) { state = stream_readable_state(js, stream_obj); if (is_object_type(state)) js_set(js, state, "flowing", js_true); stream_schedule_microtask(js, stream_readable_start_flowing, stream_obj); } return stream_obj; } static ant_value_t js_readable_resume(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); ant_value_t state = 0; if (is_err(stream_obj)) return stream_obj; state = stream_readable_state(js, stream_obj); if (is_object_type(state)) js_set(js, state, "flowing", js_true); js_stream_resume(js, NULL, 0); stream_readable_maybe_read(js, stream_obj); stream_readable_flush(js, stream_obj); return stream_obj; } static ant_value_t js_readable_pause(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable"); ant_value_t state = 0; if (is_err(stream_obj)) return stream_obj; state = stream_readable_state(js, stream_obj); if (is_object_type(state)) js_set(js, state, "flowing", js_false); return js_stream_pause(js, args, nargs); } static ant_value_t js_writable__write(ant_t *js, ant_value_t *args, int nargs) { ant_value_t callback = nargs > 2 ? args[2] : js_mkundef(); stream_call_callback(js, callback, NULL, 0); return js_mkundef(); } static ant_value_t js_writable__final(ant_t *js, ant_value_t *args, int nargs) { ant_value_t callback = nargs > 0 ? args[0] : js_mkundef(); stream_call_callback(js, callback, NULL, 0); return js_mkundef(); } static ant_value_t stream_writable_write_done(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t stream_obj = js_get(js, state_obj, "stream"); ant_value_t callback = js_get(js, state_obj, "callback"); stream_private_state_t *priv = stream_private_state(stream_obj); ant_value_t err = nargs > 0 ? args[0] : js_mkundef(); if (priv) priv->writing = false; if (!is_undefined(err) && !is_null(err)) { ant_value_t destroy_args[1] = { err }; js_set(js, state_obj, "done", js_true); if (priv) priv->pending_final = false; { ant_value_t saved_this = js->this_val; js->this_val = stream_obj; js_stream_destroy(js, destroy_args, 1); js->this_val = saved_this; } if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); return js_mkundef(); } if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0); stream_emit_named(js, stream_obj, "drain"); if (priv && priv->pending_final && !priv->final_started) { ant_value_t end_callback = js_get_slot(stream_obj, SLOT_AUX); priv->pending_final = false; stream_set_end_callback(js, stream_obj, js_mkundef()); return stream_writable_begin_end(js, stream_obj, end_callback); } return js_mkundef(); } static ant_value_t stream_writable_write_impl( ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding, ant_value_t callback, bool allow_after_end ) { ant_value_t state = 0; ant_value_t normalized = 0; ant_value_t write_fn = 0; ant_value_t done_state = 0; ant_value_t done = 0; ant_value_t write_args[3]; state = stream_writable_state(js, stream_obj); if (!is_object_type(state)) return js_false; if ( (!allow_after_end && js_truthy(js, js_get(js, stream_obj, "writableEnded"))) || js_truthy(js, js_get(js, stream_obj, "destroyed")) ) { ant_value_t err = js_mkerr(js, "write after end"); if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); else stream_emit_error(js, stream_obj, err); return js_false; } normalized = stream_normalize_chunk( js, chunk, js_truthy(js, js_get(js, state, "objectMode")), encoding ); if (is_err(normalized)) return normalized; done_state = js_mkobj(js); js_set(js, done_state, "stream", stream_obj); js_set(js, done_state, "callback", callback); js_set(js, done_state, "done", js_false); done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_write_done, done_state), js_mkundef()); write_fn = js_getprop_fallback(js, stream_obj, "_write"); stream_private_state_t *priv = stream_private_state(stream_obj); if (priv) priv->writing = true; write_args[0] = normalized; write_args[1] = encoding; write_args[2] = done; if (is_callable(write_fn)) { ant_value_t result = stream_call(js, write_fn, stream_obj, write_args, 3, false); if (is_err(result)) { ant_value_t err_args[1] = { result }; stream_call_callback(js, done, err_args, 1); return js_false; }} return js_bool(!js_truthy(js, js_get(js, stream_obj, "destroyed"))); } static ant_value_t js_writable_write(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable"); ant_value_t callback = js_mkundef(); ant_value_t encoding = js_mkstr(js, "utf8", 4); if (is_err(stream_obj)) return stream_obj; if (nargs > 1 && is_callable(args[1])) callback = args[1]; else if (nargs > 1 && !is_undefined(args[1])) encoding = args[1]; if (nargs > 2 && is_callable(args[2])) callback = args[2]; return stream_writable_write_impl( js, stream_obj, nargs > 0 ? args[0] : js_mkundef(), encoding, callback, false ); } static ant_value_t stream_writable_end_done(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t stream_obj = js_get(js, state_obj, "stream"); ant_value_t callback = js_get(js, state_obj, "callback"); ant_value_t err = nargs > 0 ? args[0] : js_mkundef(); if (!is_undefined(err) && !is_null(err)) { ant_value_t destroy_args[1] = { err }; ant_value_t saved_this = js->this_val; js->this_val = stream_obj; js_stream_destroy(js, destroy_args, 1); js->this_val = saved_this; if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); return js_mkundef(); } js_set(js, stream_obj, "writableFinished", js_true); js_set(js, stream_writable_state(js, stream_obj), "finished", js_true); stream_emit_named(js, stream_obj, "finish"); if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0); if (!js_truthy(js, js_get(js, stream_obj, "readable"))) stream_emit_named(js, stream_obj, "close"); return js_mkundef(); } ant_value_t stream_writable_begin_end(ant_t *js, ant_value_t stream_obj, ant_value_t callback) { ant_value_t final_fn = 0; ant_value_t final_args[1]; ant_value_t done_state = 0; ant_value_t done = 0; stream_private_state_t *priv = stream_private_state(stream_obj); done_state = js_mkobj(js); js_set(js, done_state, "stream", stream_obj); js_set(js, done_state, "callback", callback); done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_end_done, done_state), js_mkundef()); if (priv) { priv->final_started = true; priv->pending_final = false; } stream_set_end_callback(js, stream_obj, js_mkundef()); final_fn = js_getprop_fallback(js, stream_obj, "_final"); final_args[0] = done; if (is_callable(final_fn)) stream_call(js, final_fn, stream_obj, final_args, 1, false); else stream_call_callback(js, done, NULL, 0); return stream_obj; } static ant_value_t stream_writable_end_after_write(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t stream_obj = js_get(js, state_obj, "stream"); ant_value_t callback = js_get(js, state_obj, "callback"); stream_private_state_t *priv = stream_private_state(stream_obj); ant_value_t err = nargs > 0 ? args[0] : js_mkundef(); if (!is_undefined(err) && !is_null(err)) { ant_value_t destroy_args[1] = { err }; ant_value_t saved_this = js->this_val; js->this_val = stream_obj; js_stream_destroy(js, destroy_args, 1); js->this_val = saved_this; if (is_callable(callback)) stream_call_callback(js, callback, &err, 1); return js_mkundef(); } if (priv) priv->pending_final = false; stream_set_end_callback(js, stream_obj, js_mkundef()); return stream_writable_begin_end(js, stream_obj, callback); } static ant_value_t js_writable_end(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable"); ant_value_t callback = js_mkundef(); ant_value_t chunk = js_mkundef(); ant_value_t encoding = js_mkundef(); ant_value_t after_write_state = 0; ant_value_t after_write = 0; stream_private_state_t *priv = NULL; if (is_err(stream_obj)) return stream_obj; if (nargs > 0 && is_callable(args[0])) callback = args[0]; else { if (nargs > 0) chunk = args[0]; if (nargs > 1 && is_callable(args[1])) callback = args[1]; else if (nargs > 1) encoding = args[1]; if (nargs > 2 && is_callable(args[2])) callback = args[2]; } if (js_truthy(js, js_get(js, stream_obj, "writableEnded"))) { if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0); return stream_obj; } js_set(js, stream_obj, "writableEnded", js_true); js_set(js, stream_writable_state(js, stream_obj), "ended", js_true); priv = stream_private_state(stream_obj); if (!is_undefined(chunk) && !is_null(chunk)) { after_write_state = js_mkobj(js); js_set(js, after_write_state, "stream", stream_obj); js_set(js, after_write_state, "callback", callback); after_write = stream_make_once( js, js_heavy_mkfun(js, stream_writable_end_after_write, after_write_state), js_mkundef() ); stream_writable_write_impl(js, stream_obj, chunk, encoding, after_write, true); return stream_obj; } if (priv && priv->writing && !priv->final_started) { priv->pending_final = true; stream_set_end_callback(js, stream_obj, callback); return stream_obj; } return stream_writable_begin_end(js, stream_obj, callback); } static ant_value_t js_transform__transform(ant_t *js, ant_value_t *args, int nargs) { ant_value_t callback = nargs > 2 ? args[2] : js_mkundef(); ant_value_t cb_args[2]; cb_args[0] = js_mknull(); cb_args[1] = nargs > 0 ? args[0] : js_mkundef(); stream_call_callback(js, callback, cb_args, 2); return js_mkundef(); } static ant_value_t stream_transform_write_callback(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t stream_obj = js_get(js, state_obj, "stream"); ant_value_t outer_callback = js_get(js, state_obj, "callback"); if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) { if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, &args[0], 1, false); return js_mkundef(); } if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1])) stream_readable_push_value(js, stream_obj, args[1], js_mkundef()); if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, NULL, 0, false); return js_mkundef(); } static ant_value_t js_transform__write(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform"); ant_value_t transform_fn = 0; ant_value_t cb_state = 0; ant_value_t cb = 0; ant_value_t call_args[3]; if (is_err(stream_obj)) return stream_obj; transform_fn = js_getprop_fallback(js, stream_obj, "_transform"); cb_state = js_mkobj(js); js_set(js, cb_state, "stream", stream_obj); js_set(js, cb_state, "callback", nargs > 2 ? args[2] : js_mkundef()); cb = js_heavy_mkfun(js, stream_transform_write_callback, cb_state); call_args[0] = nargs > 0 ? args[0] : js_mkundef(); call_args[1] = nargs > 1 ? args[1] : js_mkstr(js, "utf8", 4); call_args[2] = cb; return stream_call(js, transform_fn, stream_obj, call_args, 3, false); } static ant_value_t stream_transform_final_callback(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t stream_obj = js_get(js, state_obj, "stream"); ant_value_t callback = js_get(js, state_obj, "callback"); if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) { if (is_callable(callback)) stream_call(js, callback, stream_obj, &args[0], 1, false); return js_mkundef(); } if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1])) stream_readable_push_value(js, stream_obj, args[1], js_mkundef()); stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef()); if (is_callable(callback)) stream_call(js, callback, stream_obj, NULL, 0, false); return js_mkundef(); } static ant_value_t js_transform__final(ant_t *js, ant_value_t *args, int nargs) { ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform"); ant_value_t flush_fn = 0; ant_value_t cb_state = 0; ant_value_t cb = 0; ant_value_t call_args[1]; if (is_err(stream_obj)) return stream_obj; flush_fn = js_getprop_fallback(js, stream_obj, "_flush"); if (!is_callable(flush_fn)) { stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef()); stream_call_callback(js, nargs > 0 ? args[0] : js_mkundef(), NULL, 0); return js_mkundef(); } cb_state = js_mkobj(js); js_set(js, cb_state, "stream", stream_obj); js_set(js, cb_state, "callback", nargs > 0 ? args[0] : js_mkundef()); cb = js_heavy_mkfun(js, stream_transform_final_callback, cb_state); call_args[0] = cb; return stream_call(js, flush_fn, stream_obj, call_args, 1, false); } static ant_value_t js_passthrough__transform(ant_t *js, ant_value_t *args, int nargs) { return js_transform__transform(js, args, nargs); } static ant_value_t stream_finished_cleanup(ant_t *js, ant_value_t state_obj) { ant_value_t stream_obj = js_get(js, state_obj, "stream"); if (stream_is_instance(stream_obj) || is_object_type(stream_obj)) { stream_remove_listener(js, stream_obj, "end", js_get(js, state_obj, "onFinish")); stream_remove_listener(js, stream_obj, "finish", js_get(js, state_obj, "onFinish")); stream_remove_listener(js, stream_obj, "close", js_get(js, state_obj, "onFinish")); stream_remove_listener(js, stream_obj, "error", js_get(js, state_obj, "onError")); } return js_mkundef(); } static ant_value_t stream_finished_fire(ant_t *js, ant_value_t state_obj, ant_value_t error) { ant_value_t called = js_get(js, state_obj, "called"); ant_value_t callback = js_get(js, state_obj, "callback"); ant_value_t cb_args[1]; if (js_truthy(js, called)) return js_mkundef(); js_set(js, state_obj, "called", js_true); stream_finished_cleanup(js, state_obj); if (is_undefined(error)) stream_call_callback(js, callback, NULL, 0); else { cb_args[0] = error; stream_call_callback(js, callback, cb_args, 1); } return js_mkundef(); } static ant_value_t stream_finished_on_finish(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); return stream_finished_fire(js, state_obj, js_mkundef()); } static ant_value_t stream_finished_on_error(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t error = nargs > 0 ? args[0] : js_mkundef(); return stream_finished_fire(js, state_obj, error); } static ant_value_t stream_finished_register(ant_t *js, ant_value_t stream_obj, ant_value_t callback) { ant_value_t state_obj = js_mkobj(js); ant_value_t on_finish = 0; ant_value_t on_error = 0; if (!is_callable(callback)) callback = js_mkfun(stream_noop); js_set(js, state_obj, "stream", stream_obj); js_set(js, state_obj, "callback", callback); js_set(js, state_obj, "called", js_false); on_finish = js_heavy_mkfun(js, stream_finished_on_finish, state_obj); on_error = js_heavy_mkfun(js, stream_finished_on_error, state_obj); js_set(js, state_obj, "onFinish", on_finish); js_set(js, state_obj, "onError", on_error); eventemitter_add_listener(js, stream_obj, "end", on_finish, false); eventemitter_add_listener(js, stream_obj, "finish", on_finish, false); eventemitter_add_listener(js, stream_obj, "close", on_finish, false); eventemitter_add_listener(js, stream_obj, "error", on_error, false); return stream_obj; } static ant_value_t js_stream_finished(ant_t *js, ant_value_t *args, int nargs) { ant_value_t callback = nargs > 1 ? args[1] : js_mkundef(); if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "finished requires a stream"); return stream_finished_register(js, args[0], callback); } static ant_value_t stream_pipeline_done(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t callback = js_get(js, state_obj, "callback"); ant_value_t called = js_get(js, state_obj, "called"); ant_value_t cb_args[1]; if (js_truthy(js, called)) return js_mkundef(); js_set(js, state_obj, "called", js_true); if (nargs > 0 && !is_undefined(args[0])) { cb_args[0] = args[0]; stream_call_callback(js, callback, cb_args, 1); } else stream_call_callback(js, callback, NULL, 0); return js_mkundef(); } static ant_value_t stream_pipeline_error(ant_t *js, ant_value_t *args, int nargs) { ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); if (nargs > 0 && !is_undefined(args[0])) stream_call_callback(js, done, &args[0], 1); return js_mkundef(); } static ant_value_t stream_pipeline_schedule_done(ant_t *js, ant_value_t *args, int nargs) { ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); stream_call_callback(js, done, NULL, 0); return js_mkundef(); } static ant_value_t js_stream_pipeline(ant_t *js, ant_value_t *args, int nargs) { int stream_count = nargs; ant_value_t callback = js_mkundef(); ant_value_t done_state = 0; ant_value_t done = 0; if (nargs > 0 && is_callable(args[nargs - 1])) { callback = args[nargs - 1]; stream_count--; } if (!is_callable(callback)) callback = js_mkfun(stream_noop); if (stream_count <= 0) return js_mkundef(); done_state = js_mkobj(js); js_set(js, done_state, "callback", callback); js_set(js, done_state, "called", js_false); done = js_heavy_mkfun(js, stream_pipeline_done, done_state); if (stream_count < 2) { stream_schedule_microtask(js, stream_pipeline_schedule_done, done); return args[0]; } for (int i = 0; i < stream_count - 1; i++) { ant_value_t error_cb = js_heavy_mkfun(js, stream_pipeline_error, done); ant_value_t finished_args[2]; finished_args[0] = args[i]; finished_args[1] = error_cb; js_stream_finished(js, finished_args, 2); ant_value_t pipe_args[2]; pipe_args[0] = args[i + 1]; pipe_args[1] = js_mkundef(); stream_call_prop(js, args[i], "pipe", pipe_args, 2); } { ant_value_t finished_args[2]; finished_args[0] = args[stream_count - 1]; finished_args[1] = done; js_stream_finished(js, finished_args, 2); } return args[stream_count - 1]; } static ant_value_t stream_promise_callback(ant_t *js, ant_value_t *args, int nargs) { ant_value_t promise = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); if (nargs > 0 && !is_undefined(args[0])) js_reject_promise(js, promise, args[0]); else js_resolve_promise(js, promise, js_mkundef()); return js_mkundef(); } static ant_value_t js_stream_promises_finished(ant_t *js, ant_value_t *args, int nargs) { ant_value_t promise = js_mkpromise(js); ant_value_t finished_args[2]; if (nargs < 1 || !is_object_type(args[0])) { js_reject_promise(js, promise, js_mkerr(js, "finished requires a stream")); return promise; } finished_args[0] = args[0]; finished_args[1] = js_heavy_mkfun(js, stream_promise_callback, promise); js_stream_finished(js, finished_args, 2); return promise; } static ant_value_t js_stream_promises_pipeline(ant_t *js, ant_value_t *args, int nargs) { ant_value_t promise = js_mkpromise(js); ant_value_t *call_args = NULL; if (nargs <= 0) { js_resolve_promise(js, promise, js_mkundef()); return promise; } call_args = malloc((size_t)(nargs + 1) * sizeof(*call_args)); if (!call_args) { js_reject_promise(js, promise, js_mkerr(js, "out of memory")); return promise; } for (int i = 0; i < nargs; i++) call_args[i] = args[i]; call_args[nargs] = js_heavy_mkfun(js, stream_promise_callback, promise); js_stream_pipeline(js, call_args, nargs + 1); free(call_args); return promise; } static void stream_release_reader(ant_t *js, ant_value_t state_obj) { ant_value_t reader = js_get(js, state_obj, "reader"); if (!is_object_type(reader)) return; stream_call_prop(js, reader, "releaseLock", NULL, 0); } static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs); static void stream_readable_from_schedule(ant_t *js, ant_value_t state_obj) { stream_schedule_microtask(js, stream_readable_from_step, state_obj); } static ant_value_t stream_readable_from_fail(ant_t *js, ant_value_t state_obj, ant_value_t error) { ant_value_t readable = js_get(js, state_obj, "readable"); stream_release_reader(js, state_obj); if (stream_is_instance(readable)) { ant_value_t destroy_args[1] = { error }; ant_value_t saved_this = js->this_val; js->this_val = readable; js_stream_destroy(js, destroy_args, 1); js->this_val = saved_this; } return js_mkundef(); } static ant_value_t stream_readable_from_handle_result(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t readable = js_get(js, state_obj, "readable"); ant_value_t result = nargs > 0 ? args[0] : js_mkundef(); ant_value_t done = 0; ant_value_t value = 0; if (js_truthy(js, js_get(js, readable, "destroyed"))) { stream_release_reader(js, state_obj); return js_mkundef(); } if (!is_object_type(result)) return stream_readable_from_fail(js, state_obj, js_mkerr(js, "iterator step must be an object")); done = js_get(js, result, "done"); value = js_get(js, result, "value"); if (js_truthy(js, done)) { stream_release_reader(js, state_obj); stream_readable_push_value(js, readable, js_mknull(), js_mkundef()); return js_mkundef(); } stream_readable_push_value(js, readable, value, js_mkundef()); if (!js_truthy(js, js_get(js, readable, "destroyed"))) stream_readable_from_schedule(js, state_obj); return js_mkundef(); } static ant_value_t stream_readable_from_reject(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t error = nargs > 0 ? args[0] : js_mkerr(js, "stream iteration failed"); return stream_readable_from_fail(js, state_obj, error); } static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t readable = js_get(js, state_obj, "readable"); ant_value_t mode = js_get(js, state_obj, "mode"); ant_value_t iterator = js_get(js, state_obj, "iterator"); ant_value_t next_result = js_mkundef(); ant_value_t on_resolve = js_heavy_mkfun(js, stream_readable_from_handle_result, state_obj); ant_value_t on_reject = js_heavy_mkfun(js, stream_readable_from_reject, state_obj); ant_value_t then_result = 0; if (js_truthy(js, js_get(js, readable, "destroyed"))) { stream_release_reader(js, state_obj); return js_mkundef(); } if (stream_key_is_cstr(js, mode, "reader")) next_result = stream_call_prop(js, iterator, "read", NULL, 0); else next_result = stream_call_prop(js, iterator, "next", NULL, 0); if (is_err(next_result)) return stream_readable_from_fail(js, state_obj, next_result); if (vtype(next_result) == T_PROMISE) { then_result = js_promise_then(js, next_result, on_resolve, on_reject); promise_mark_handled(then_result); return js_mkundef(); } ant_value_t one_arg[1] = { next_result }; return stream_readable_from_handle_result(js, one_arg, 1); } static ant_value_t stream_readable_from_start(ant_t *js, ant_value_t *args, int nargs) { ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); ant_value_t readable = js_get(js, state_obj, "readable"); ant_value_t source = js_get(js, state_obj, "source"); ant_value_t async_iter_fn = 0; ant_value_t reader_fn = 0; js_iter_t it; if (js_truthy(js, js_get(js, readable, "destroyed"))) return js_mkundef(); async_iter_fn = is_object_type(source) ? js_get_sym(js, source, get_asyncIterator_sym()) : js_mkundef(); if (is_callable(async_iter_fn)) { ant_value_t iterator = stream_call(js, async_iter_fn, source, NULL, 0, false); if (is_err(iterator)) return stream_readable_from_fail(js, state_obj, iterator); js_set(js, state_obj, "iterator", iterator); js_set(js, state_obj, "mode", js_mkstr(js, "async", 5)); stream_readable_from_schedule(js, state_obj); return js_mkundef(); } if (js_iter_open(js, source, &it)) { ant_value_t value = 0; while (js_iter_next(js, &it, &value)) { if (js_truthy(js, js_get(js, readable, "destroyed"))) break; stream_readable_push_value(js, readable, value, js_mkundef()); } js_iter_close(js, &it); if (!js_truthy(js, js_get(js, readable, "destroyed"))) stream_readable_push_value(js, readable, js_mknull(), js_mkundef()); return js_mkundef(); } reader_fn = is_object_type(source) ? js_get(js, source, "getReader") : js_mkundef(); if (is_callable(reader_fn)) { ant_value_t reader = stream_call(js, reader_fn, source, NULL, 0, false); if (is_err(reader)) return stream_readable_from_fail(js, state_obj, reader); js_set(js, state_obj, "reader", reader); js_set(js, state_obj, "iterator", reader); js_set(js, state_obj, "mode", js_mkstr(js, "reader", 6)); stream_readable_from_schedule(js, state_obj); return js_mkundef(); } if (!is_undefined(source)) stream_readable_push_value(js, readable, source, js_mkundef()); stream_readable_push_value(js, readable, js_mknull(), js_mkundef()); return js_mkundef(); } static ant_value_t js_readable_from(ant_t *js, ant_value_t *args, int nargs) { ant_value_t ctor_args[1]; ant_value_t readable = 0; ant_value_t state_obj = 0; ctor_args[0] = nargs > 1 ? args[1] : js_mkundef(); readable = stream_construct(js, g_readable_proto, ctor_args[0], stream_init_readable); if (is_err(readable)) return readable; state_obj = js_mkobj(js); js_set(js, state_obj, "readable", readable); js_set(js, state_obj, "source", nargs > 0 ? args[0] : js_mkundef()); js_set(js, state_obj, "iterator", js_mkundef()); js_set(js, state_obj, "reader", js_mkundef()); js_set(js, state_obj, "mode", js_mkundef()); stream_schedule_microtask(js, stream_readable_from_start, state_obj); return readable; } static ant_value_t js_readable_from_web(ant_t *js, ant_value_t *args, int nargs) { return js_readable_from(js, args, nargs); } static ant_value_t js_stream_ctor(ant_t *js, ant_value_t *args, int nargs) { return stream_construct(js, g_stream_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_base); } static ant_value_t js_readable_ctor(ant_t *js, ant_value_t *args, int nargs) { return stream_construct(js, g_readable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_readable); } static ant_value_t js_writable_ctor(ant_t *js, ant_value_t *args, int nargs) { return stream_construct(js, g_writable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_writable); } static ant_value_t js_duplex_ctor(ant_t *js, ant_value_t *args, int nargs) { ant_value_t proto = js_instance_proto_from_new_target(js, g_duplex_proto); ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_duplex_proto); ant_value_t options = nargs > 0 ? args[0] : js_mkundef(); ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js); stream_init_readable(js, obj, options); stream_init_writable(js, obj, options); js_set(js, obj, "readable", js_true); js_set(js, obj, "writable", js_true); js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false)); return obj; } static ant_value_t js_transform_ctor(ant_t *js, ant_value_t *args, int nargs) { ant_value_t proto = js_instance_proto_from_new_target(js, g_transform_proto); ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_transform_proto); ant_value_t options = nargs > 0 ? args[0] : js_mkundef(); ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js); ant_value_t transform_fn = 0; ant_value_t flush_fn = 0; stream_init_readable(js, obj, options); stream_init_writable(js, obj, options); js_set(js, obj, "readable", js_true); js_set(js, obj, "writable", js_true); js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false)); transform_fn = stream_get_option(js, options_obj, "transform"); flush_fn = stream_get_option(js, options_obj, "flush"); if (is_callable(transform_fn)) js_set(js, obj, "_transform", transform_fn); if (is_callable(flush_fn)) js_set(js, obj, "_flush", flush_fn); return obj; } static ant_value_t js_passthrough_ctor(ant_t *js, ant_value_t *args, int nargs) { ant_value_t proto = js_instance_proto_from_new_target(js, g_passthrough_proto); ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_passthrough_proto); ant_value_t options = nargs > 0 ? args[0] : js_mkundef(); ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js); stream_init_readable(js, obj, options); stream_init_writable(js, obj, options); js_set(js, obj, "readable", js_true); js_set(js, obj, "writable", js_true); js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false)); return obj; } void stream_init_constructors(ant_t *js) { ant_value_t events = 0; ant_value_t ee_ctor = 0; ant_value_t ee_proto = 0; if (g_stream_ctor) return; events = events_library(js); ee_ctor = js_get(js, events, "EventEmitter"); ee_proto = js_get(js, ee_ctor, "prototype"); g_stream_proto = js_mkobj(js); js_set_proto_init(g_stream_proto, ee_proto); js_set(js, g_stream_proto, "pipe", js_mkfun(js_stream_pipe)); js_set(js, g_stream_proto, "unpipe", js_mkfun(js_stream_unpipe)); js_set(js, g_stream_proto, "pause", js_mkfun(js_stream_pause)); js_set(js, g_stream_proto, "resume", js_mkfun(js_stream_resume)); js_set(js, g_stream_proto, "isPaused", js_mkfun(js_stream_is_paused)); js_set(js, g_stream_proto, "destroy", js_mkfun(js_stream_destroy)); js_set_sym(js, g_stream_proto, get_toStringTag_sym(), js_mkstr(js, "Stream", 6)); g_stream_ctor = js_make_ctor(js, js_stream_ctor, g_stream_proto, "Stream", 6); g_readable_proto = js_mkobj(js); js_set_proto_init(g_readable_proto, g_stream_proto); js_set(js, g_readable_proto, "_read", js_mkfun(js_readable__read)); js_set(js, g_readable_proto, "push", js_mkfun(js_readable_push)); js_set(js, g_readable_proto, "read", js_mkfun(js_readable_read)); js_set(js, g_readable_proto, "setEncoding", js_mkfun(js_readable_set_encoding)); js_set(js, g_readable_proto, "on", js_mkfun(js_readable_on)); js_set(js, g_readable_proto, "resume", js_mkfun(js_readable_resume)); js_set(js, g_readable_proto, "pause", js_mkfun(js_readable_pause)); js_set_sym(js, g_readable_proto, get_toStringTag_sym(), js_mkstr(js, "Readable", 8)); g_readable_ctor = js_make_ctor(js, js_readable_ctor, g_readable_proto, "Readable", 8); js_set(js, g_readable_ctor, "from", js_mkfun(js_readable_from)); js_set(js, g_readable_ctor, "fromWeb", js_mkfun(js_readable_from_web)); g_writable_proto = js_mkobj(js); js_set_proto_init(g_writable_proto, g_stream_proto); js_set(js, g_writable_proto, "_write", js_mkfun(js_writable__write)); js_set(js, g_writable_proto, "_final", js_mkfun(js_writable__final)); js_set(js, g_writable_proto, "write", js_mkfun(js_writable_write)); js_set(js, g_writable_proto, "end", js_mkfun(js_writable_end)); js_set(js, g_writable_proto, "cork", js_mkfun(stream_noop)); js_set(js, g_writable_proto, "uncork", js_mkfun(stream_noop)); js_set_sym(js, g_writable_proto, get_toStringTag_sym(), js_mkstr(js, "Writable", 8)); g_writable_ctor = js_make_ctor(js, js_writable_ctor, g_writable_proto, "Writable", 8); g_duplex_proto = js_mkobj(js); js_set_proto_init(g_duplex_proto, g_readable_proto); js_set(js, g_duplex_proto, "_write", js_mkfun(js_writable__write)); js_set(js, g_duplex_proto, "_final", js_mkfun(js_writable__final)); js_set(js, g_duplex_proto, "write", js_mkfun(js_writable_write)); js_set(js, g_duplex_proto, "end", js_mkfun(js_writable_end)); js_set(js, g_duplex_proto, "cork", js_mkfun(stream_noop)); js_set(js, g_duplex_proto, "uncork", js_mkfun(stream_noop)); js_set_sym(js, g_duplex_proto, get_toStringTag_sym(), js_mkstr(js, "Duplex", 6)); g_duplex_ctor = js_make_ctor(js, js_duplex_ctor, g_duplex_proto, "Duplex", 6); g_transform_proto = js_mkobj(js); js_set_proto_init(g_transform_proto, g_duplex_proto); js_set(js, g_transform_proto, "_transform", js_mkfun(js_transform__transform)); js_set(js, g_transform_proto, "_write", js_mkfun(js_transform__write)); js_set(js, g_transform_proto, "_final", js_mkfun(js_transform__final)); js_set_sym(js, g_transform_proto, get_toStringTag_sym(), js_mkstr(js, "Transform", 9)); g_transform_ctor = js_make_ctor(js, js_transform_ctor, g_transform_proto, "Transform", 9); g_passthrough_proto = js_mkobj(js); js_set_proto_init(g_passthrough_proto, g_transform_proto); js_set(js, g_passthrough_proto, "_transform", js_mkfun(js_passthrough__transform)); js_set_sym(js, g_passthrough_proto, get_toStringTag_sym(), js_mkstr(js, "PassThrough", 11)); g_passthrough_ctor = js_make_ctor(js, js_passthrough_ctor, g_passthrough_proto, "PassThrough", 11); gc_register_root(&g_stream_proto); gc_register_root(&g_stream_ctor); gc_register_root(&g_readable_proto); gc_register_root(&g_readable_ctor); gc_register_root(&g_writable_proto); gc_register_root(&g_writable_ctor); gc_register_root(&g_duplex_proto); gc_register_root(&g_duplex_ctor); gc_register_root(&g_transform_proto); gc_register_root(&g_transform_ctor); gc_register_root(&g_passthrough_proto); gc_register_root(&g_passthrough_ctor); } ant_value_t stream_readable_constructor(ant_t *js) { stream_init_constructors(js); return g_readable_ctor; } ant_value_t stream_writable_constructor(ant_t *js) { stream_init_constructors(js); return g_writable_ctor; } ant_value_t stream_readable_prototype(ant_t *js) { stream_init_constructors(js); return g_readable_proto; } ant_value_t stream_writable_prototype(ant_t *js) { stream_init_constructors(js); return g_writable_proto; } ant_value_t stream_duplex_prototype(ant_t *js) { stream_init_constructors(js); return g_duplex_proto; } ant_value_t stream_construct_readable(ant_t *js, ant_value_t base_proto, ant_value_t options) { stream_init_constructors(js); return stream_construct(js, base_proto, options, stream_init_readable); } ant_value_t stream_construct_writable(ant_t *js, ant_value_t base_proto, ant_value_t options) { stream_init_constructors(js); return stream_construct(js, base_proto, options, stream_init_writable); } void stream_init_readable_object(ant_t *js, ant_value_t obj, ant_value_t options) { stream_init_constructors(js); if (!is_object_type(obj)) return; js_set_native_tag(obj, STREAM_NATIVE_TAG); stream_init_readable(js, obj, options); } void stream_init_writable_object(ant_t *js, ant_value_t obj, ant_value_t options) { stream_init_constructors(js); if (!is_object_type(obj)) return; js_set_native_tag(obj, STREAM_NATIVE_TAG); stream_init_writable(js, obj, options); } void stream_init_duplex_object(ant_t *js, ant_value_t obj, ant_value_t options) { stream_init_constructors(js); if (!is_object_type(obj)) return; js_set_native_tag(obj, STREAM_NATIVE_TAG); stream_init_readable(js, obj, options); stream_init_writable(js, obj, options); } ant_value_t stream_readable_push(ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding) { stream_init_constructors(js); return stream_readable_push_value(js, stream_obj, chunk, encoding); } static ant_value_t js_stream_get_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) { bool object_mode = nargs > 0 && js_truthy(js, args[0]); return js_mknum(stream_default_high_water_mark(object_mode)); } static ant_value_t js_stream_set_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) { if (nargs < 2 || vtype(args[1]) != T_NUM || js_getnum(args[1]) < 0) return js_mkerr_typed(js, JS_ERR_RANGE, "setDefaultHighWaterMark requires a non-negative number"); bool object_mode = js_truthy(js, args[0]); if (object_mode) g_default_object_high_water_mark = js_getnum(args[1]); else g_default_high_water_mark = js_getnum(args[1]); return js_mkundef(); } static void stream_web_copy_global(ant_t *js, ant_value_t obj, const char *name) { ant_value_t value = js_get(js, js->global, name); if (is_err(value)) return; js_set(js, obj, name, value); } // TODO: remove copy-on-start static void stream_web_define_common(ant_t *js, ant_value_t obj) { stream_web_copy_global(js, obj, "ReadableStream"); stream_web_copy_global(js, obj, "ReadableStreamDefaultReader"); stream_web_copy_global(js, obj, "ReadableStreamDefaultController"); stream_web_copy_global(js, obj, "WritableStream"); stream_web_copy_global(js, obj, "WritableStreamDefaultWriter"); stream_web_copy_global(js, obj, "WritableStreamDefaultController"); stream_web_copy_global(js, obj, "TransformStream"); stream_web_copy_global(js, obj, "TransformStreamDefaultController"); stream_web_copy_global(js, obj, "ByteLengthQueuingStrategy"); stream_web_copy_global(js, obj, "CountQueuingStrategy"); stream_web_copy_global(js, obj, "TextEncoderStream"); stream_web_copy_global(js, obj, "TextDecoderStream"); stream_web_copy_global(js, obj, "CompressionStream"); stream_web_copy_global(js, obj, "DecompressionStream"); } ant_value_t stream_library(ant_t *js) { ant_value_t lib = js_mkobj(js); ant_value_t promises = js_mkobj(js); stream_init_constructors(js); js_set(js, promises, "pipeline", js_mkfun(js_stream_promises_pipeline)); js_set(js, promises, "finished", js_mkfun(js_stream_promises_finished)); js_set_module_default(js, lib, g_stream_ctor, "Stream"); js_set(js, lib, "Readable", g_readable_ctor); js_set(js, lib, "Writable", g_writable_ctor); js_set(js, lib, "Duplex", g_duplex_ctor); js_set(js, lib, "Transform", g_transform_ctor); js_set(js, lib, "PassThrough", g_passthrough_ctor); js_set(js, lib, "pipeline", js_mkfun(js_stream_pipeline)); js_set(js, lib, "finished", js_mkfun(js_stream_finished)); js_set(js, lib, "getDefaultHighWaterMark", js_mkfun(js_stream_get_default_high_water_mark)); js_set(js, lib, "setDefaultHighWaterMark", js_mkfun(js_stream_set_default_high_water_mark)); js_set(js, lib, "promises", promises); js_set(js, g_stream_ctor, "Readable", g_readable_ctor); js_set(js, g_stream_ctor, "Writable", g_writable_ctor); js_set(js, g_stream_ctor, "Duplex", g_duplex_ctor); js_set(js, g_stream_ctor, "Transform", g_transform_ctor); js_set(js, g_stream_ctor, "PassThrough", g_passthrough_ctor); js_set(js, g_stream_ctor, "pipeline", js_get(js, lib, "pipeline")); js_set(js, g_stream_ctor, "finished", js_get(js, lib, "finished")); js_set(js, g_stream_ctor, "getDefaultHighWaterMark", js_get(js, lib, "getDefaultHighWaterMark")); js_set(js, g_stream_ctor, "setDefaultHighWaterMark", js_get(js, lib, "setDefaultHighWaterMark")); js_set(js, g_stream_ctor, "promises", promises); js_set(js, promises, "default", promises); js_set_slot_wb(js, promises, SLOT_DEFAULT, promises); js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream", 6)); return lib; } ant_value_t stream_promises_library(ant_t *js) { ant_value_t stream_ns = js_esm_import_sync_cstr(js, "stream", 6); ant_value_t promises = 0; if (is_err(stream_ns)) return stream_ns; promises = js_get(js, stream_ns, "promises"); if (!is_object_type(promises)) return js_mkerr(js, "stream.promises is not available"); js_set(js, promises, "default", promises); js_set_slot_wb(js, promises, SLOT_DEFAULT, promises); return promises; } ant_value_t stream_web_library(ant_t *js) { ant_value_t lib = js_mkobj(js); stream_web_define_common(js, lib); js_set(js, lib, "default", lib); js_set_slot_wb(js, lib, SLOT_DEFAULT, lib); js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream/web", 10)); return lib; }