MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1#include <stdlib.h>
2#include <string.h>
3
4#include "ant.h"
5#include "ptr.h"
6#include "internal.h"
7#include "silver/engine.h"
8#include "esm/loader.h"
9#include "gc/roots.h"
10
11#include "modules/assert.h"
12#include "modules/buffer.h"
13#include "modules/events.h"
14#include "modules/stream.h"
15#include "modules/symbol.h"
16#include "modules/string_decoder.h"
17
18enum { STREAM_NATIVE_TAG = 0x5354524Du }; // STRM
19
20static ant_value_t g_stream_proto = 0;
21static ant_value_t g_stream_ctor = 0;
22
23static ant_value_t g_readable_proto = 0;
24static ant_value_t g_readable_ctor = 0;
25
26static ant_value_t g_writable_proto = 0;
27static ant_value_t g_writable_ctor = 0;
28
29static ant_value_t g_duplex_proto = 0;
30static ant_value_t g_duplex_ctor = 0;
31
32static ant_value_t g_transform_proto = 0;
33static ant_value_t g_transform_ctor = 0;
34
35static ant_value_t g_passthrough_proto = 0;
36static ant_value_t g_passthrough_ctor = 0;
37
38static double g_default_high_water_mark = 16384.0;
39static double g_default_object_high_water_mark = 16.0;
40
41static ant_value_t stream_noop(ant_t *js, ant_value_t *args, int nargs) {
42 return js_mkundef();
43}
44
45static bool stream_is_instance(ant_value_t value) {
46 return is_object_type(value) && js_check_native_tag(value, STREAM_NATIVE_TAG);
47}
48
49static inline void stream_set_end_callback(ant_t *js, ant_value_t stream_obj, ant_value_t callback) {
50 js_set_slot_wb(js, stream_obj, SLOT_AUX, callback);
51}
52
53static stream_private_state_t *stream_private_state(ant_value_t stream_obj) {
54 if (!stream_is_instance(stream_obj)) return NULL;
55 return (stream_private_state_t *)js_get_native_ptr(stream_obj);
56}
57
58static ant_value_t stream_require_this(ant_t *js, ant_value_t value, const char *label) {
59 if (!stream_is_instance(value))
60 return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid %s", label);
61 return value;
62}
63
64static ant_value_t stream_truthy_or_object(ant_t *js, ant_value_t value) {
65 return js_truthy(js, value) ? value : js_mkobj(js);
66}
67
68static ant_value_t stream_readable_state(ant_t *js, ant_value_t stream_obj) {
69 return js_get(js, stream_obj, "_readableState");
70}
71
72static ant_value_t stream_writable_state(ant_t *js, ant_value_t stream_obj) {
73 return js_get(js, stream_obj, "_writableState");
74}
75
76static ant_value_t stream_pipes(ant_t *js, ant_value_t stream_obj) {
77 return js_get(js, stream_obj, "_pipes");
78}
79
80static bool stream_key_is_cstr(ant_t *js, ant_value_t value, const char *expected) {
81 size_t len = 0;
82 const char *s = NULL;
83 if (vtype(value) != T_STR) return false;
84 s = js_getstr(js, value, &len);
85 return s && len == strlen(expected) && memcmp(s, expected, len) == 0;
86}
87
88static ant_value_t stream_event_key(ant_t *js, ant_value_t value) {
89 uint8_t t = vtype(value);
90 if (t == T_STR || t == T_SYMBOL) return value;
91 return js_mkerr(js, "event must be a string or Symbol");
92}
93
94void *stream_get_attached_state(ant_value_t stream_obj) {
95 stream_private_state_t *priv = stream_private_state(stream_obj);
96 return priv ? priv->attached_state : NULL;
97}
98
99void stream_set_attached_state(
100 ant_value_t stream_obj,
101 void *state,
102 stream_finalize_fn finalize
103) {
104 stream_private_state_t *priv = stream_private_state(stream_obj);
105 if (!priv) return;
106 priv->attached_state = state;
107 priv->attached_state_finalize = finalize;
108}
109
110void stream_clear_attached_state(ant_value_t stream_obj) {
111 stream_private_state_t *priv = stream_private_state(stream_obj);
112 if (!priv) return;
113 priv->attached_state = NULL;
114 priv->attached_state_finalize = NULL;
115}
116
117static void stream_finalize(ant_t *js, ant_object_t *obj) {
118 ant_value_t stream_obj = js_obj_from_ptr(obj);
119 stream_private_state_t *priv = stream_private_state(stream_obj);
120 if (!priv) return;
121 js_set_native_ptr(stream_obj, NULL);
122 if (priv->attached_state && priv->attached_state_finalize)
123 priv->attached_state_finalize(js, stream_obj, priv->attached_state);
124 free(priv);
125}
126
127static ant_value_t stream_call(
128 ant_t *js,
129 ant_value_t fn,
130 ant_value_t this_val,
131 ant_value_t *args,
132 int nargs,
133 bool is_ctor
134) {
135 if (!is_callable(fn)) return js_mkundef();
136 if (sv_check_c_stack_overflow(js))
137 return js_mkerr_typed(js, JS_ERR_RANGE | JS_ERR_NO_STACK, "Maximum call stack size exceeded");
138
139 sv_call_mode_t mode = is_ctor ? SV_CALL_MODE_CONSTRUCT : SV_CALL_MODE_NORMAL;
140 sv_call_plan_t plan;
141 ant_value_t err = sv_prepare_call(js->vm, js, fn, this_val, args, nargs, NULL, mode, &plan);
142 if (is_err(err)) return err;
143
144 return sv_execute_call_plan(js->vm, js, &plan, NULL);
145}
146
147static ant_value_t stream_call_prop(
148 ant_t *js,
149 ant_value_t target,
150 const char *name,
151 ant_value_t *args,
152 int nargs
153) {
154 ant_value_t fn = js_getprop_fallback(js, target, name);
155 if (is_err(fn) || !is_callable(fn)) return js_mkundef();
156 return stream_call(js, fn, target, args, nargs, false);
157}
158
159static void stream_call_callback(ant_t *js, ant_value_t fn, ant_value_t *args, int nargs) {
160 if (!is_callable(fn)) return;
161 stream_call(js, fn, js_mkundef(), args, nargs, false);
162}
163
164static void stream_schedule_microtask(ant_t *js, ant_cfunc_t fn, ant_value_t data) {
165 ant_value_t promise = js_mkpromise(js);
166 ant_value_t cb = js_heavy_mkfun(js, fn, data);
167 ant_value_t then_result = 0;
168
169 js_resolve_promise(js, promise, js_mkundef());
170 then_result = js_promise_then(js, promise, cb, js_mkundef());
171 promise_mark_handled(then_result);
172}
173
174static ant_value_t stream_buffer_ctor(ant_t *js) {
175 ant_value_t ns = js_esm_import_sync_cstr(js, "buffer", 6);
176 if (is_err(ns)) return ns;
177 return js_get(js, ns, "Buffer");
178}
179
180static ant_value_t stream_readable_decoder(ant_t *js, ant_value_t stream_obj) {
181 ant_value_t state = stream_readable_state(js, stream_obj);
182 if (!is_object_type(state)) return js_mkundef();
183 return js_get(js, state, "decoder");
184}
185
186static ant_value_t stream_readable_decode_chunk(
187 ant_t *js, ant_value_t stream_obj,
188 ant_value_t chunk, bool flush
189) {
190 ant_value_t decoder = stream_readable_decoder(js, stream_obj);
191 if (!is_object_type(decoder)) return chunk;
192 return string_decoder_decode_value(js, decoder, chunk, flush);
193}
194
195static bool stream_value_is_empty_string(ant_t *js, ant_value_t value) {
196 size_t len = 0;
197 if (vtype(value) != T_STR) return false;
198 (void)js_getstr(js, value, &len);
199 return len == 0;
200}
201
202static ant_value_t stream_make_buffer(ant_t *js, ant_value_t value, ant_value_t encoding) {
203 ant_value_t buffer_ctor = stream_buffer_ctor(js);
204 ant_value_t from_fn = 0;
205 ant_value_t args[2];
206
207 if (is_err(buffer_ctor)) return buffer_ctor;
208 from_fn = js_get(js, buffer_ctor, "from");
209 if (is_err(from_fn) || !is_callable(from_fn))
210 return js_mkerr(js, "Buffer.from is not available");
211
212 args[0] = value;
213 args[1] = encoding;
214 return stream_call(js, from_fn, buffer_ctor, args, 2, false);
215}
216
217static ant_value_t stream_normalize_chunk(
218 ant_t *js,
219 ant_value_t chunk,
220 bool object_mode,
221 ant_value_t encoding
222) {
223 ant_value_t str_val = 0;
224
225 if (
226 object_mode || is_null(chunk) || is_undefined(chunk) ||
227 vtype(chunk) == T_TYPEDARRAY || buffer_is_binary_source(chunk)
228 ) return chunk;
229
230 if (vtype(chunk) == T_STR) return stream_make_buffer(js, chunk, encoding);
231
232 str_val = js_tostring_val(js, chunk);
233 if (is_err(str_val)) return str_val;
234
235 return stream_make_buffer(js, str_val, encoding);
236}
237
238static ant_value_t stream_readable_buffer(ant_t *js, ant_value_t stream_obj) {
239 ant_value_t state = stream_readable_state(js, stream_obj);
240 if (!is_object_type(state)) return js_mkundef();
241 return js_get(js, state, "buffer");
242}
243
244static ant_offset_t stream_readable_buffer_head(ant_t *js, ant_value_t stream_obj) {
245 ant_value_t state = stream_readable_state(js, stream_obj);
246 ant_value_t head = is_object_type(state) ? js_get(js, state, "bufferHead") : js_mkundef();
247 return vtype(head) == T_NUM ? (ant_offset_t)js_getnum(head) : 0;
248}
249
250static void stream_set_readable_buffer_head(ant_t *js, ant_value_t stream_obj, ant_offset_t head) {
251 ant_value_t state = stream_readable_state(js, stream_obj);
252 if (is_object_type(state)) js_set(js, state, "bufferHead", js_mknum((double)head));
253}
254
255static ant_offset_t stream_readable_buffer_len(ant_t *js, ant_value_t stream_obj) {
256 ant_value_t buffer = stream_readable_buffer(js, stream_obj);
257 ant_offset_t head = stream_readable_buffer_head(js, stream_obj);
258 ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0;
259 return len > head ? len - head : 0;
260}
261
262static void stream_compact_readable_buffer(ant_t *js, ant_value_t stream_obj) {
263 ant_value_t state = stream_readable_state(js, stream_obj);
264 ant_value_t buffer = stream_readable_buffer(js, stream_obj);
265 ant_offset_t head = stream_readable_buffer_head(js, stream_obj);
266 ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0;
267 ant_value_t compact = 0;
268
269 if (!is_object_type(state) || vtype(buffer) != T_ARR) return;
270 if (head == 0) return;
271
272 if (head >= len) {
273 compact = js_mkarr(js);
274 js_set(js, state, "buffer", compact);
275 js_set(js, state, "bufferHead", js_mknum(0));
276 return;
277 }
278
279 if (head <= 32 && head * 2 < len) return;
280 compact = js_mkarr(js);
281 for (ant_offset_t i = head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, buffer, i));
282
283 js_set(js, state, "buffer", compact);
284 js_set(js, state, "bufferHead", js_mknum(0));
285}
286
287static void stream_buffer_push(ant_t *js, ant_value_t stream_obj, ant_value_t value) {
288 ant_value_t state = stream_readable_state(js, stream_obj);
289 ant_value_t buffer = stream_readable_buffer(js, stream_obj);
290
291 if (!is_object_type(state) || vtype(buffer) != T_ARR) return;
292 js_arr_push(js, buffer, value);
293}
294
295static ant_value_t stream_buffer_shift(ant_t *js, ant_value_t stream_obj) {
296 ant_value_t buffer = stream_readable_buffer(js, stream_obj);
297 ant_offset_t head = stream_readable_buffer_head(js, stream_obj);
298 ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0;
299 ant_value_t value = js_mkundef();
300
301 if (vtype(buffer) != T_ARR || head >= len) return js_mkundef();
302 value = js_arr_get(js, buffer, head);
303 stream_set_readable_buffer_head(js, stream_obj, head + 1);
304 stream_compact_readable_buffer(js, stream_obj);
305 return value;
306}
307
308static bool stream_listener_count_positive(ant_t *js, ant_value_t target, const char *event_name) {
309 ant_value_t args[1];
310 ant_value_t result = 0;
311
312 args[0] = js_mkstr(js, event_name, strlen(event_name));
313 result = stream_call_prop(js, target, "listenerCount", args, 1);
314 return vtype(result) == T_NUM && js_getnum(result) > 0;
315}
316
317static void stream_remove_listener(
318 ant_t *js,
319 ant_value_t target,
320 const char *event_name,
321 ant_value_t listener
322) {
323 ant_value_t args[2];
324 args[0] = js_mkstr(js, event_name, strlen(event_name));
325 args[1] = listener;
326 stream_call_prop(js, target, "removeListener", args, 2);
327}
328
329static ant_value_t stream_get_option(ant_t *js, ant_value_t options, const char *name) {
330 if (!is_object_type(options)) return js_mkundef();
331 return js_get(js, options, name);
332}
333
334static double stream_default_high_water_mark(bool object_mode) {
335 return object_mode ? g_default_object_high_water_mark : g_default_high_water_mark;
336}
337
338static double stream_high_water_mark_from_options(ant_t *js, ant_value_t options, bool object_mode) {
339 ant_value_t hwm = stream_get_option(js, options, "highWaterMark");
340 return (vtype(hwm) == T_NUM && js_getnum(hwm) > 0)
341 ? js_getnum(hwm)
342 : stream_default_high_water_mark(object_mode);
343}
344
345static ant_value_t stream_make_base_object(ant_t *js, ant_value_t proto) {
346 ant_value_t obj = js_mkobj(js);
347 stream_private_state_t *priv = calloc(1, sizeof(*priv));
348
349 if (is_object_type(proto)) js_set_proto_init(obj, proto);
350 js_set_native_tag(obj, STREAM_NATIVE_TAG);
351
352 if (priv) js_set_native_ptr(obj, priv);
353 js_set_slot(obj, SLOT_AUX, js_mkundef());
354 js_set_finalizer(obj, stream_finalize);
355
356 return obj;
357}
358
359static void stream_init_base(ant_t *js, ant_value_t obj, ant_value_t raw_options) {
360 ant_value_t pipes = js_mkarr(js);
361 js_set(js, obj, "readable", js_true);
362 js_set(js, obj, "writable", js_true);
363 js_set(js, obj, "destroyed", js_false);
364 js_set(js, obj, "_paused", js_false);
365 js_set(js, obj, "_pipes", pipes);
366 js_set(js, obj, "_streamOptions", stream_truthy_or_object(js, raw_options));
367}
368
369static void stream_init_readable(ant_t *js, ant_value_t obj, ant_value_t raw_options) {
370 ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js);
371 ant_value_t state = js_mkobj(js);
372 ant_value_t read_fn = stream_get_option(js, options, "read");
373
374 bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode"));
375 double high_water_mark = stream_high_water_mark_from_options(js, options, object_mode);
376
377 stream_init_base(js, obj, raw_options);
378 js_set(js, obj, "readable", js_true);
379 js_set(js, obj, "writable", js_false);
380 js_set(js, obj, "readableEnded", js_false);
381
382 js_set(js, state, "objectMode", js_bool(object_mode));
383 js_set(js, state, "ended", js_false);
384 js_set(js, state, "endEmitted", js_false);
385 js_set(js, state, "flowing", js_false);
386 js_set(js, state, "flowingReadScheduled", js_false);
387 js_set(js, state, "reading", js_false);
388 js_set(js, state, "highWaterMark", js_mknum(high_water_mark));
389 js_set(js, state, "buffer", js_mkarr(js));
390 js_set(js, state, "bufferHead", js_mknum(0));
391 js_set(js, obj, "_readableState", state);
392
393 if (is_callable(read_fn)) js_set(js, obj, "_read", read_fn);
394}
395
396static void stream_init_writable(ant_t *js, ant_value_t obj, ant_value_t raw_options) {
397 ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js);
398 ant_value_t state = js_mkobj(js);
399 bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode"))
400 || js_truthy(js, stream_get_option(js, options, "writableObjectMode"));
401 ant_value_t write_fn = stream_get_option(js, options, "write");
402 ant_value_t final_fn = stream_get_option(js, options, "final");
403 ant_value_t destroy_fn = stream_get_option(js, options, "destroy");
404
405 stream_init_base(js, obj, raw_options);
406 js_set(js, obj, "readable", js_false);
407 js_set(js, obj, "writable", js_true);
408 js_set(js, obj, "writableEnded", js_false);
409 js_set(js, obj, "writableFinished", js_false);
410
411 js_set(js, state, "objectMode", js_bool(object_mode));
412 js_set(js, state, "finished", js_false);
413 js_set(js, state, "ended", js_false);
414 js_set(js, obj, "_writableState", state);
415
416 if (is_callable(write_fn)) js_set(js, obj, "_write", write_fn);
417 if (is_callable(final_fn)) js_set(js, obj, "_final", final_fn);
418 if (is_callable(destroy_fn)) js_set(js, obj, "_destroy", destroy_fn);
419}
420
421static ant_value_t stream_construct(
422 ant_t *js,
423 ant_value_t base_proto,
424 ant_value_t raw_options,
425 void (*init_fn)(ant_t *, ant_value_t, ant_value_t)
426) {
427 ant_value_t proto = js_instance_proto_from_new_target(js, base_proto);
428 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : base_proto);
429 init_fn(js, obj, raw_options);
430 return obj;
431}
432
433static ant_value_t stream_emit_named(ant_t *js, ant_value_t stream_obj, const char *event_name) {
434 return js_bool(eventemitter_emit_args(js, stream_obj, event_name, NULL, 0));
435}
436
437static void stream_emit_error(ant_t *js, ant_value_t stream_obj, ant_value_t error) {
438 ant_value_t args[1];
439 args[0] = error;
440 eventemitter_emit_args(js, stream_obj, "error", args, 1);
441}
442
443static void stream_readable_schedule_continue_flowing(ant_t *js, ant_value_t stream_obj) {
444 ant_value_t state = stream_readable_state(js, stream_obj);
445
446 if (!is_object_type(state)) return;
447 if (!js_truthy(js, js_get(js, state, "flowing"))) return;
448 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return;
449 if (js_truthy(js, js_get(js, state, "ended"))) return;
450 if (stream_readable_buffer_len(js, stream_obj) > 0) return;
451 if (js_truthy(js, js_get(js, state, "flowingReadScheduled"))) return;
452
453 js_set(js, state, "flowingReadScheduled", js_true);
454 stream_schedule_microtask(js, stream_readable_continue_flowing, stream_obj);
455}
456
457static ant_value_t js_stream_pause(ant_t *js, ant_value_t *args, int nargs) {
458 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
459 if (is_err(stream_obj)) return stream_obj;
460
461 js_set(js, stream_obj, "_paused", js_true);
462 stream_emit_named(js, stream_obj, "pause");
463 return stream_obj;
464}
465
466static ant_value_t js_stream_resume(ant_t *js, ant_value_t *args, int nargs) {
467 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
468 if (is_err(stream_obj)) return stream_obj;
469
470 js_set(js, stream_obj, "_paused", js_false);
471 stream_emit_named(js, stream_obj, "resume");
472 return stream_obj;
473}
474
475static ant_value_t js_stream_is_paused(ant_t *js, ant_value_t *args, int nargs) {
476 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
477 ant_value_t paused = 0;
478 if (is_err(stream_obj)) return stream_obj;
479 paused = js_get(js, stream_obj, "_paused");
480 return js_bool(js_truthy(js, paused));
481}
482
483static void stream_pipe_remove_state(ant_t *js, ant_value_t source, ant_value_t state_obj) {
484 ant_value_t pipes = stream_pipes(js, source);
485 ant_offset_t len = vtype(pipes) == T_ARR ? js_arr_len(js, pipes) : 0;
486 ant_value_t next = js_mkarr(js);
487
488 for (ant_offset_t i = 0; i < len; i++) {
489 ant_value_t item = js_arr_get(js, pipes, i);
490 if (item != state_obj) js_arr_push(js, next, item);
491 }
492
493 js_set(js, source, "_pipes", next);
494}
495
496static void stream_pipe_cleanup(ant_t *js, ant_value_t state_obj) {
497 ant_value_t cleaned = js_get(js, state_obj, "cleaned");
498 ant_value_t source = js_get(js, state_obj, "source");
499 ant_value_t dest = js_get(js, state_obj, "dest");
500 ant_value_t on_data = js_get(js, state_obj, "onData");
501 ant_value_t on_drain = js_get(js, state_obj, "onDrain");
502 ant_value_t on_end = js_get(js, state_obj, "onEnd");
503 ant_value_t on_close = js_get(js, state_obj, "onClose");
504 ant_value_t on_error = js_get(js, state_obj, "onError");
505
506 if (js_truthy(js, cleaned)) return;
507 js_set(js, state_obj, "cleaned", js_true);
508
509 if (stream_is_instance(source)) {
510 stream_remove_listener(js, source, "data", on_data);
511 stream_remove_listener(js, source, "end", on_end);
512 stream_remove_listener(js, source, "close", on_close);
513 stream_remove_listener(js, source, "error", on_error);
514 stream_pipe_remove_state(js, source, state_obj);
515 }
516
517 if (is_object_type(dest))
518 stream_remove_listener(js, dest, "drain", on_drain);
519}
520
521static ant_value_t stream_pipe_on_data(ant_t *js, ant_value_t *args, int nargs) {
522 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
523 ant_value_t source = js_get(js, state_obj, "source");
524 ant_value_t dest = js_get(js, state_obj, "dest");
525 ant_value_t result = js_mkundef();
526
527 if (!is_object_type(dest)) return js_mkundef();
528 result = stream_call_prop(js, dest, "write", nargs > 0 ? &args[0] : NULL, nargs > 0 ? 1 : 0);
529 if (is_err(result)) return result;
530
531 if (result == js_false) stream_call_prop(js, source, "pause", NULL, 0);
532 return js_mkundef();
533}
534
535static ant_value_t stream_pipe_on_drain(ant_t *js, ant_value_t *args, int nargs) {
536 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
537 ant_value_t source = js_get(js, state_obj, "source");
538 stream_call_prop(js, source, "resume", NULL, 0);
539 return js_mkundef();
540}
541
542static ant_value_t stream_pipe_on_end(ant_t *js, ant_value_t *args, int nargs) {
543 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
544 ant_value_t dest = js_get(js, state_obj, "dest");
545 bool end_dest = js_truthy(js, js_get(js, state_obj, "end"));
546 stream_pipe_cleanup(js, state_obj);
547 if (end_dest) stream_call_prop(js, dest, "end", NULL, 0);
548 return js_mkundef();
549}
550
551static ant_value_t stream_pipe_on_close(ant_t *js, ant_value_t *args, int nargs) {
552 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
553 stream_pipe_cleanup(js, state_obj);
554 return js_mkundef();
555}
556
557static ant_value_t stream_pipe_on_error(ant_t *js, ant_value_t *args, int nargs) {
558 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
559 ant_value_t dest = js_get(js, state_obj, "dest");
560 stream_pipe_cleanup(js, state_obj);
561 if (is_object_type(dest) && stream_listener_count_positive(js, dest, "error") && nargs > 0)
562 eventemitter_emit_args(js, dest, "error", &args[0], 1);
563 return js_mkundef();
564}
565
566static ant_value_t js_stream_pipe(ant_t *js, ant_value_t *args, int nargs) {
567 ant_value_t source = stream_require_this(js, js_getthis(js), "stream");
568 ant_value_t options = nargs > 1 ? args[1] : js_mkundef();
569 ant_value_t state_obj = 0;
570 ant_value_t readable_state = 0;
571 bool end_dest = true;
572
573 if (is_err(source)) return source;
574 if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "pipe requires a destination stream");
575 if (is_object_type(options)) {
576 ant_value_t end_val = js_get(js, options, "end");
577 if (!is_undefined(end_val)) end_dest = end_val != js_false;
578 }
579
580 state_obj = js_mkobj(js);
581 js_set(js, state_obj, "source", source);
582 js_set(js, state_obj, "dest", args[0]);
583 js_set(js, state_obj, "end", js_bool(end_dest));
584 js_set(js, state_obj, "cleaned", js_false);
585 js_set(js, state_obj, "onData", js_heavy_mkfun(js, stream_pipe_on_data, state_obj));
586 js_set(js, state_obj, "onDrain", js_heavy_mkfun(js, stream_pipe_on_drain, state_obj));
587 js_set(js, state_obj, "onEnd", js_heavy_mkfun(js, stream_pipe_on_end, state_obj));
588 js_set(js, state_obj, "onClose", js_heavy_mkfun(js, stream_pipe_on_close, state_obj));
589 js_set(js, state_obj, "onError", js_heavy_mkfun(js, stream_pipe_on_error, state_obj));
590
591 js_arr_push(js, stream_pipes(js, source), state_obj);
592 eventemitter_add_listener(js, source, "data", js_get(js, state_obj, "onData"), false);
593 eventemitter_add_listener(js, source, "end", js_get(js, state_obj, "onEnd"), true);
594 eventemitter_add_listener(js, source, "close", js_get(js, state_obj, "onClose"), true);
595 eventemitter_add_listener(js, source, "error", js_get(js, state_obj, "onError"), false);
596 eventemitter_add_listener(js, args[0], "drain", js_get(js, state_obj, "onDrain"), false);
597 eventemitter_emit_args(js, args[0], "pipe", &source, 1);
598 readable_state = stream_readable_state(js, source);
599 if (is_object_type(readable_state)) js_set(js, readable_state, "flowing", js_true);
600 stream_call_prop(js, source, "resume", NULL, 0);
601
602 return args[0];
603}
604
605static ant_value_t js_stream_unpipe(ant_t *js, ant_value_t *args, int nargs) {
606 ant_value_t source = stream_require_this(js, js_getthis(js), "stream");
607 ant_value_t pipes = 0;
608 ant_value_t matches = 0;
609 ant_offset_t len = 0;
610 ant_value_t dest = nargs > 0 ? args[0] : js_mkundef();
611
612 if (is_err(source)) return source;
613 pipes = stream_pipes(js, source);
614 if (vtype(pipes) != T_ARR) return source;
615
616 matches = js_mkarr(js);
617 len = js_arr_len(js, pipes);
618 for (ant_offset_t i = 0; i < len; i++) {
619 ant_value_t state_obj = js_arr_get(js, pipes, i);
620 ant_value_t entry_dest = js_get(js, state_obj, "dest");
621 if (!is_object_type(dest) || entry_dest == dest) js_arr_push(js, matches, state_obj);
622 }
623
624 len = js_arr_len(js, matches);
625 for (ant_offset_t i = 0; i < len; i++) stream_pipe_cleanup(js, js_arr_get(js, matches, i));
626 return source;
627}
628
629static ant_value_t stream_destroy_done(ant_t *js, ant_value_t *args, int nargs) {
630 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
631 ant_value_t stream_obj = js_get(js, state_obj, "stream");
632 ant_value_t destroyed_err = (nargs > 0) ? args[0] : js_mkundef();
633 if (!is_null(destroyed_err) && !is_undefined(destroyed_err)) stream_emit_error(js, stream_obj, destroyed_err);
634 stream_emit_named(js, stream_obj, "close");
635 return js_mkundef();
636}
637
638static ant_value_t stream_once_call(ant_t *js, ant_value_t *args, int nargs) {
639 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
640 ant_value_t fn = js_get(js, state_obj, "fn");
641 ant_value_t this_val = js_get(js, state_obj, "thisVal");
642 ant_value_t called = js_get(js, state_obj, "called");
643
644 if (js_truthy(js, called)) return js_mkundef();
645 js_set(js, state_obj, "called", js_true);
646 return stream_call(js, fn, this_val, args, nargs, false);
647}
648
649static ant_value_t stream_make_once(ant_t *js, ant_value_t fn, ant_value_t this_val) {
650 ant_value_t state_obj = js_mkobj(js);
651 js_set(js, state_obj, "fn", fn);
652 js_set(js, state_obj, "thisVal", this_val);
653 js_set(js, state_obj, "called", js_false);
654 return js_heavy_mkfun(js, stream_once_call, state_obj);
655}
656
657static ant_value_t js_stream_destroy(ant_t *js, ant_value_t *args, int nargs) {
658 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
659 ant_value_t destroy_fn = 0;
660 ant_value_t done_state = 0;
661 ant_value_t done = 0;
662 ant_value_t destroy_args[2];
663 ant_value_t error = nargs > 0 ? args[0] : js_mkundef();
664 ant_value_t result = 0;
665
666 if (is_err(stream_obj)) return stream_obj;
667 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return stream_obj;
668
669 js_set(js, stream_obj, "destroyed", js_true);
670
671 done_state = js_mkobj(js);
672 js_set(js, done_state, "stream", stream_obj);
673 done = stream_make_once(js, js_heavy_mkfun(js, stream_destroy_done, done_state), js_mkundef());
674 destroy_fn = js_getprop_fallback(js, stream_obj, "_destroy");
675
676 if (is_callable(destroy_fn)) {
677 destroy_args[0] = is_undefined(error) ? js_mknull() : error;
678 destroy_args[1] = done;
679 result = stream_call(js, destroy_fn, stream_obj, destroy_args, 2, false);
680 return is_err(result) ? result : stream_obj;
681 }
682
683 destroy_args[0] = is_undefined(error) ? js_mknull() : error;
684 stream_call_callback(js, done, destroy_args, 1);
685 return stream_obj;
686}
687
688static ant_value_t js_readable__read(ant_t *js, ant_value_t *args, int nargs) {
689 return js_mkundef();
690}
691
692static ant_value_t stream_readable_start_flowing(ant_t *js, ant_value_t *args, int nargs) {
693 ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
694 return stream_readable_begin_flowing(js, stream_obj);
695}
696
697ant_value_t stream_readable_continue_flowing(ant_t *js, ant_value_t *args, int nargs) {
698 ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
699 ant_value_t state = stream_readable_state(js, stream_obj);
700
701 if (!is_object_type(state)) return js_mkundef();
702 js_set(js, state, "flowingReadScheduled", js_false);
703
704 if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef();
705 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef();
706 if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef();
707
708 stream_readable_maybe_read(js, stream_obj);
709 stream_readable_flush(js, stream_obj);
710
711 return js_mkundef();
712}
713
714ant_value_t stream_readable_begin_flowing(ant_t *js, ant_value_t stream_obj) {
715 ant_value_t state = stream_readable_state(js, stream_obj);
716
717 if (!is_object_type(state)) return js_mkundef();
718 if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef();
719
720 {
721 ant_value_t saved_this = js->this_val;
722 js->this_val = stream_obj;
723 js_stream_resume(js, NULL, 0);
724 js->this_val = saved_this;
725 }
726
727 stream_readable_maybe_read(js, stream_obj);
728 stream_readable_flush(js, stream_obj);
729
730 return js_mkundef();
731}
732
733ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj) {
734 ant_value_t state = stream_readable_state(js, stream_obj);
735 bool emitted_data = false;
736
737 if (!is_object_type(state)) return js_mkundef();
738
739 while (js_truthy(js, js_get(js, state, "flowing")) && stream_readable_buffer_len(js, stream_obj) > 0) {
740 ant_value_t chunk = stream_buffer_shift(js, stream_obj);
741 chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false);
742 if (is_err(chunk)) return chunk;
743 emitted_data = true;
744 eventemitter_emit_args(js, stream_obj, "data", &chunk, 1);
745 }
746
747 if (
748 js_truthy(js, js_get(js, state, "ended")) &&
749 stream_readable_buffer_len(js, stream_obj) == 0 &&
750 !js_truthy(js, js_get(js, state, "endEmitted"))
751 ) {
752 ant_value_t tail = stream_readable_decode_chunk(js, stream_obj, js_mkundef(), true);
753 if (is_err(tail)) return tail;
754 if (!is_undefined(tail) && !stream_value_is_empty_string(js, tail)) {
755 emitted_data = true;
756 eventemitter_emit_args(js, stream_obj, "data", &tail, 1);
757 }
758 js_set(js, state, "endEmitted", js_true);
759 js_set(js, stream_obj, "readableEnded", js_true);
760 stream_emit_named(js, stream_obj, "end");
761 stream_emit_named(js, stream_obj, "close");
762 } else if (emitted_data) stream_readable_schedule_continue_flowing(js, stream_obj);
763
764 return js_mkundef();
765}
766
767ant_value_t stream_readable_maybe_read(ant_t *js, ant_value_t stream_obj) {
768 ant_value_t state = stream_readable_state(js, stream_obj);
769 ant_value_t read_fn = 0;
770 ant_value_t args[1];
771
772 if (!is_object_type(state)) return js_mkundef();
773 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef();
774 if (js_truthy(js, js_get(js, state, "reading"))) return js_mkundef();
775 if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef();
776 if (stream_readable_buffer_len(js, stream_obj) > 0) return js_mkundef();
777
778 read_fn = js_getprop_fallback(js, stream_obj, "_read");
779 js_set(js, state, "reading", js_true);
780 args[0] = js_get(js, state, "highWaterMark");
781
782 if (is_callable(read_fn)) stream_call(js, read_fn, stream_obj, args, 1, false);
783 js_set(js, state, "reading", js_false);
784
785 return js_mkundef();
786}
787
788ant_value_t stream_readable_push_value(
789 ant_t *js,
790 ant_value_t stream_obj,
791 ant_value_t chunk,
792 ant_value_t encoding
793) {
794 ant_value_t state = stream_readable_state(js, stream_obj);
795 ant_value_t normalized = 0;
796
797 if (!is_object_type(state)) return js_false;
798 if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_false;
799
800 if (is_null(chunk)) {
801 js_set(js, state, "ended", js_true);
802 stream_readable_flush(js, stream_obj);
803 return js_false;
804 }
805
806 normalized = stream_normalize_chunk(
807 js, chunk,
808 js_truthy(js, js_get(js, state, "objectMode")),
809 is_undefined(encoding) ? js_mkstr(js, "utf8", 4) : encoding
810 );
811 if (is_err(normalized)) return normalized;
812
813 stream_buffer_push(js, stream_obj, normalized);
814 if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj);
815
816 return js_bool(js_truthy(js, js_get(js, state, "flowing")));
817}
818
819static ant_value_t js_readable_push(ant_t *js, ant_value_t *args, int nargs) {
820 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
821 ant_value_t chunk = nargs > 0 ? args[0] : js_mkundef();
822 ant_value_t encoding = nargs > 1 ? args[1] : js_mkundef();
823 if (is_err(stream_obj)) return stream_obj;
824 return stream_readable_push_value(js, stream_obj, chunk, encoding);
825}
826
827static ant_value_t js_readable_read(ant_t *js, ant_value_t *args, int nargs) {
828 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
829 ant_value_t state = 0;
830 ant_value_t chunk = 0;
831
832 if (is_err(stream_obj)) return stream_obj;
833 state = stream_readable_state(js, stream_obj);
834 if (!is_object_type(state)) return js_mknull();
835
836 if (stream_readable_buffer_len(js, stream_obj) == 0) stream_readable_maybe_read(js, stream_obj);
837 if (stream_readable_buffer_len(js, stream_obj) == 0) return js_mknull();
838
839 chunk = stream_buffer_shift(js, stream_obj);
840 chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false);
841 if (is_err(chunk)) return chunk;
842 if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj);
843
844 return chunk;
845}
846
847static ant_value_t js_readable_set_encoding(ant_t *js, ant_value_t *args, int nargs) {
848 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
849 ant_value_t state = 0; ant_value_t decoder = 0;
850
851 ant_value_t encoding = nargs > 0 && !is_undefined(args[0]) ? args[0] : js_mkstr(js, "utf8", 4);
852 ant_value_t encoding_str = 0;
853
854 if (is_err(stream_obj)) return stream_obj;
855 state = stream_readable_state(js, stream_obj);
856 if (!is_object_type(state)) return stream_obj;
857
858 decoder = string_decoder_create(js, encoding);
859 if (is_err(decoder)) return decoder;
860 encoding_str = js_tostring_val(js, encoding);
861 if (is_err(encoding_str)) return encoding_str;
862
863 js_set(js, state, "decoder", decoder);
864 js_set(js, stream_obj, "encoding", encoding_str);
865 js_set(js, stream_obj, "readableEncoding", encoding_str);
866
867 return stream_obj;
868}
869
870static ant_value_t js_readable_on(ant_t *js, ant_value_t *args, int nargs) {
871 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
872 ant_value_t key = 0;
873 ant_value_t state = 0;
874
875 if (is_err(stream_obj)) return stream_obj;
876 if (nargs < 2) return js_mkerr(js, "on requires 2 arguments (event, listener)");
877 key = stream_event_key(js, args[0]);
878
879 if (is_err(key)) return key;
880 if (!eventemitter_add_listener_val(js, stream_obj, key, args[1], false))
881 return js_mkerr(js, "listener must be a function");
882
883 if (stream_key_is_cstr(js, key, "data")) {
884 state = stream_readable_state(js, stream_obj);
885 if (is_object_type(state)) js_set(js, state, "flowing", js_true);
886 stream_schedule_microtask(js, stream_readable_start_flowing, stream_obj);
887 }
888
889 return stream_obj;
890}
891
892static ant_value_t js_readable_resume(ant_t *js, ant_value_t *args, int nargs) {
893 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
894 ant_value_t state = 0;
895 if (is_err(stream_obj)) return stream_obj;
896
897 state = stream_readable_state(js, stream_obj);
898 if (is_object_type(state)) js_set(js, state, "flowing", js_true);
899
900 js_stream_resume(js, NULL, 0);
901 stream_readable_maybe_read(js, stream_obj);
902 stream_readable_flush(js, stream_obj);
903
904 return stream_obj;
905}
906
907static ant_value_t js_readable_pause(ant_t *js, ant_value_t *args, int nargs) {
908 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
909 ant_value_t state = 0;
910 if (is_err(stream_obj)) return stream_obj;
911
912 state = stream_readable_state(js, stream_obj);
913 if (is_object_type(state)) js_set(js, state, "flowing", js_false);
914 return js_stream_pause(js, args, nargs);
915}
916
917static ant_value_t js_writable__write(ant_t *js, ant_value_t *args, int nargs) {
918 ant_value_t callback = nargs > 2 ? args[2] : js_mkundef();
919 stream_call_callback(js, callback, NULL, 0);
920 return js_mkundef();
921}
922
923static ant_value_t js_writable__final(ant_t *js, ant_value_t *args, int nargs) {
924 ant_value_t callback = nargs > 0 ? args[0] : js_mkundef();
925 stream_call_callback(js, callback, NULL, 0);
926 return js_mkundef();
927}
928
929static ant_value_t stream_writable_write_done(ant_t *js, ant_value_t *args, int nargs) {
930 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
931 ant_value_t stream_obj = js_get(js, state_obj, "stream");
932 ant_value_t callback = js_get(js, state_obj, "callback");
933
934 stream_private_state_t *priv = stream_private_state(stream_obj);
935 ant_value_t err = nargs > 0 ? args[0] : js_mkundef();
936
937 if (priv) priv->writing = false;
938
939 if (!is_undefined(err) && !is_null(err)) {
940 ant_value_t destroy_args[1] = { err };
941 js_set(js, state_obj, "done", js_true);
942 if (priv) priv->pending_final = false; {
943 ant_value_t saved_this = js->this_val;
944 js->this_val = stream_obj;
945 js_stream_destroy(js, destroy_args, 1);
946 js->this_val = saved_this;
947 }
948
949 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
950 return js_mkundef();
951 }
952
953 if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0);
954 stream_emit_named(js, stream_obj, "drain");
955
956 if (priv && priv->pending_final && !priv->final_started) {
957 ant_value_t end_callback = js_get_slot(stream_obj, SLOT_AUX);
958 priv->pending_final = false;
959 stream_set_end_callback(js, stream_obj, js_mkundef());
960 return stream_writable_begin_end(js, stream_obj, end_callback);
961 }
962
963 return js_mkundef();
964}
965
966static ant_value_t stream_writable_write_impl(
967 ant_t *js,
968 ant_value_t stream_obj,
969 ant_value_t chunk,
970 ant_value_t encoding,
971 ant_value_t callback,
972 bool allow_after_end
973) {
974 ant_value_t state = 0;
975 ant_value_t normalized = 0;
976 ant_value_t write_fn = 0;
977 ant_value_t done_state = 0;
978 ant_value_t done = 0;
979 ant_value_t write_args[3];
980
981 state = stream_writable_state(js, stream_obj);
982 if (!is_object_type(state)) return js_false;
983
984 if (
985 (!allow_after_end && js_truthy(js, js_get(js, stream_obj, "writableEnded"))) ||
986 js_truthy(js, js_get(js, stream_obj, "destroyed"))
987 ) {
988 ant_value_t err = js_mkerr(js, "write after end");
989 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
990 else stream_emit_error(js, stream_obj, err);
991 return js_false;
992 }
993
994 normalized = stream_normalize_chunk(
995 js, chunk,
996 js_truthy(js, js_get(js, state, "objectMode")),
997 encoding
998 );
999
1000 if (is_err(normalized)) return normalized;
1001 done_state = js_mkobj(js);
1002
1003 js_set(js, done_state, "stream", stream_obj);
1004 js_set(js, done_state, "callback", callback);
1005 js_set(js, done_state, "done", js_false);
1006
1007 done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_write_done, done_state), js_mkundef());
1008 write_fn = js_getprop_fallback(js, stream_obj, "_write");
1009 stream_private_state_t *priv = stream_private_state(stream_obj);
1010 if (priv) priv->writing = true;
1011
1012 write_args[0] = normalized;
1013 write_args[1] = encoding;
1014 write_args[2] = done;
1015
1016 if (is_callable(write_fn)) {
1017 ant_value_t result = stream_call(js, write_fn, stream_obj, write_args, 3, false);
1018 if (is_err(result)) {
1019 ant_value_t err_args[1] = { result };
1020 stream_call_callback(js, done, err_args, 1);
1021 return js_false;
1022 }}
1023
1024 return js_bool(!js_truthy(js, js_get(js, stream_obj, "destroyed")));
1025}
1026
1027static ant_value_t js_writable_write(ant_t *js, ant_value_t *args, int nargs) {
1028 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable");
1029 ant_value_t callback = js_mkundef();
1030 ant_value_t encoding = js_mkstr(js, "utf8", 4);
1031
1032 if (is_err(stream_obj)) return stream_obj;
1033
1034 if (nargs > 1 && is_callable(args[1])) callback = args[1];
1035 else if (nargs > 1 && !is_undefined(args[1])) encoding = args[1];
1036 if (nargs > 2 && is_callable(args[2])) callback = args[2];
1037
1038 return stream_writable_write_impl(
1039 js, stream_obj,
1040 nargs > 0 ? args[0] : js_mkundef(),
1041 encoding, callback, false
1042 );
1043}
1044
1045static ant_value_t stream_writable_end_done(ant_t *js, ant_value_t *args, int nargs) {
1046 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1047 ant_value_t stream_obj = js_get(js, state_obj, "stream");
1048 ant_value_t callback = js_get(js, state_obj, "callback");
1049 ant_value_t err = nargs > 0 ? args[0] : js_mkundef();
1050
1051 if (!is_undefined(err) && !is_null(err)) {
1052 ant_value_t destroy_args[1] = { err };
1053 ant_value_t saved_this = js->this_val;
1054 js->this_val = stream_obj;
1055 js_stream_destroy(js, destroy_args, 1);
1056 js->this_val = saved_this;
1057 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
1058 return js_mkundef();
1059 }
1060
1061 js_set(js, stream_obj, "writableFinished", js_true);
1062 js_set(js, stream_writable_state(js, stream_obj), "finished", js_true);
1063 stream_emit_named(js, stream_obj, "finish");
1064
1065 if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0);
1066 if (!js_truthy(js, js_get(js, stream_obj, "readable"))) stream_emit_named(js, stream_obj, "close");
1067
1068 return js_mkundef();
1069}
1070
1071ant_value_t stream_writable_begin_end(ant_t *js, ant_value_t stream_obj, ant_value_t callback) {
1072 ant_value_t final_fn = 0;
1073 ant_value_t final_args[1];
1074 ant_value_t done_state = 0;
1075 ant_value_t done = 0;
1076 stream_private_state_t *priv = stream_private_state(stream_obj);
1077
1078 done_state = js_mkobj(js);
1079 js_set(js, done_state, "stream", stream_obj);
1080 js_set(js, done_state, "callback", callback);
1081
1082 done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_end_done, done_state), js_mkundef());
1083 if (priv) {
1084 priv->final_started = true;
1085 priv->pending_final = false;
1086 }
1087
1088 stream_set_end_callback(js, stream_obj, js_mkundef());
1089 final_fn = js_getprop_fallback(js, stream_obj, "_final");
1090 final_args[0] = done;
1091
1092 if (is_callable(final_fn)) stream_call(js, final_fn, stream_obj, final_args, 1, false);
1093 else stream_call_callback(js, done, NULL, 0);
1094
1095 return stream_obj;
1096}
1097
1098static ant_value_t stream_writable_end_after_write(ant_t *js, ant_value_t *args, int nargs) {
1099 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1100 ant_value_t stream_obj = js_get(js, state_obj, "stream");
1101 ant_value_t callback = js_get(js, state_obj, "callback");
1102 stream_private_state_t *priv = stream_private_state(stream_obj);
1103 ant_value_t err = nargs > 0 ? args[0] : js_mkundef();
1104
1105 if (!is_undefined(err) && !is_null(err)) {
1106 ant_value_t destroy_args[1] = { err };
1107 ant_value_t saved_this = js->this_val;
1108 js->this_val = stream_obj;
1109 js_stream_destroy(js, destroy_args, 1);
1110 js->this_val = saved_this;
1111 if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
1112 return js_mkundef();
1113 }
1114
1115 if (priv) priv->pending_final = false;
1116 stream_set_end_callback(js, stream_obj, js_mkundef());
1117
1118 return stream_writable_begin_end(js, stream_obj, callback);
1119}
1120
1121static ant_value_t js_writable_end(ant_t *js, ant_value_t *args, int nargs) {
1122 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable");
1123 ant_value_t callback = js_mkundef();
1124 ant_value_t chunk = js_mkundef();
1125 ant_value_t encoding = js_mkundef();
1126 ant_value_t after_write_state = 0;
1127 ant_value_t after_write = 0;
1128 stream_private_state_t *priv = NULL;
1129
1130 if (is_err(stream_obj)) return stream_obj;
1131 if (nargs > 0 && is_callable(args[0])) callback = args[0];
1132 else {
1133 if (nargs > 0) chunk = args[0];
1134 if (nargs > 1 && is_callable(args[1])) callback = args[1];
1135 else if (nargs > 1) encoding = args[1];
1136 if (nargs > 2 && is_callable(args[2])) callback = args[2];
1137 }
1138
1139 if (js_truthy(js, js_get(js, stream_obj, "writableEnded"))) {
1140 if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0);
1141 return stream_obj;
1142 }
1143
1144 js_set(js, stream_obj, "writableEnded", js_true);
1145 js_set(js, stream_writable_state(js, stream_obj), "ended", js_true);
1146 priv = stream_private_state(stream_obj);
1147
1148 if (!is_undefined(chunk) && !is_null(chunk)) {
1149 after_write_state = js_mkobj(js);
1150 js_set(js, after_write_state, "stream", stream_obj);
1151 js_set(js, after_write_state, "callback", callback);
1152
1153 after_write = stream_make_once(
1154 js, js_heavy_mkfun(js, stream_writable_end_after_write, after_write_state),
1155 js_mkundef()
1156 );
1157
1158 stream_writable_write_impl(js, stream_obj, chunk, encoding, after_write, true);
1159 return stream_obj;
1160 }
1161
1162 if (priv && priv->writing && !priv->final_started) {
1163 priv->pending_final = true;
1164 stream_set_end_callback(js, stream_obj, callback);
1165 return stream_obj;
1166 }
1167
1168 return stream_writable_begin_end(js, stream_obj, callback);
1169}
1170
1171static ant_value_t js_transform__transform(ant_t *js, ant_value_t *args, int nargs) {
1172 ant_value_t callback = nargs > 2 ? args[2] : js_mkundef();
1173 ant_value_t cb_args[2];
1174 cb_args[0] = js_mknull();
1175 cb_args[1] = nargs > 0 ? args[0] : js_mkundef();
1176 stream_call_callback(js, callback, cb_args, 2);
1177 return js_mkundef();
1178}
1179
1180static ant_value_t stream_transform_write_callback(ant_t *js, ant_value_t *args, int nargs) {
1181 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1182 ant_value_t stream_obj = js_get(js, state_obj, "stream");
1183 ant_value_t outer_callback = js_get(js, state_obj, "callback");
1184
1185 if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) {
1186 if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, &args[0], 1, false);
1187 return js_mkundef();
1188 }
1189
1190 if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1]))
1191 stream_readable_push_value(js, stream_obj, args[1], js_mkundef());
1192
1193 if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, NULL, 0, false);
1194 return js_mkundef();
1195}
1196
1197static ant_value_t js_transform__write(ant_t *js, ant_value_t *args, int nargs) {
1198 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform");
1199 ant_value_t transform_fn = 0;
1200 ant_value_t cb_state = 0;
1201 ant_value_t cb = 0;
1202 ant_value_t call_args[3];
1203
1204 if (is_err(stream_obj)) return stream_obj;
1205 transform_fn = js_getprop_fallback(js, stream_obj, "_transform");
1206
1207 cb_state = js_mkobj(js);
1208 js_set(js, cb_state, "stream", stream_obj);
1209 js_set(js, cb_state, "callback", nargs > 2 ? args[2] : js_mkundef());
1210 cb = js_heavy_mkfun(js, stream_transform_write_callback, cb_state);
1211
1212 call_args[0] = nargs > 0 ? args[0] : js_mkundef();
1213 call_args[1] = nargs > 1 ? args[1] : js_mkstr(js, "utf8", 4);
1214 call_args[2] = cb;
1215
1216 return stream_call(js, transform_fn, stream_obj, call_args, 3, false);
1217}
1218
1219static ant_value_t stream_transform_final_callback(ant_t *js, ant_value_t *args, int nargs) {
1220 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1221 ant_value_t stream_obj = js_get(js, state_obj, "stream");
1222 ant_value_t callback = js_get(js, state_obj, "callback");
1223
1224 if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) {
1225 if (is_callable(callback)) stream_call(js, callback, stream_obj, &args[0], 1, false);
1226 return js_mkundef();
1227 }
1228
1229 if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1]))
1230 stream_readable_push_value(js, stream_obj, args[1], js_mkundef());
1231 stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef());
1232 if (is_callable(callback)) stream_call(js, callback, stream_obj, NULL, 0, false);
1233
1234 return js_mkundef();
1235}
1236
1237static ant_value_t js_transform__final(ant_t *js, ant_value_t *args, int nargs) {
1238 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform");
1239 ant_value_t flush_fn = 0;
1240 ant_value_t cb_state = 0;
1241 ant_value_t cb = 0;
1242 ant_value_t call_args[1];
1243
1244 if (is_err(stream_obj)) return stream_obj;
1245 flush_fn = js_getprop_fallback(js, stream_obj, "_flush");
1246
1247 if (!is_callable(flush_fn)) {
1248 stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef());
1249 stream_call_callback(js, nargs > 0 ? args[0] : js_mkundef(), NULL, 0);
1250 return js_mkundef();
1251 }
1252
1253 cb_state = js_mkobj(js);
1254 js_set(js, cb_state, "stream", stream_obj);
1255 js_set(js, cb_state, "callback", nargs > 0 ? args[0] : js_mkundef());
1256 cb = js_heavy_mkfun(js, stream_transform_final_callback, cb_state);
1257 call_args[0] = cb;
1258 return stream_call(js, flush_fn, stream_obj, call_args, 1, false);
1259}
1260
1261static ant_value_t js_passthrough__transform(ant_t *js, ant_value_t *args, int nargs) {
1262 return js_transform__transform(js, args, nargs);
1263}
1264
1265static ant_value_t stream_finished_cleanup(ant_t *js, ant_value_t state_obj) {
1266 ant_value_t stream_obj = js_get(js, state_obj, "stream");
1267 if (stream_is_instance(stream_obj) || is_object_type(stream_obj)) {
1268 stream_remove_listener(js, stream_obj, "end", js_get(js, state_obj, "onFinish"));
1269 stream_remove_listener(js, stream_obj, "finish", js_get(js, state_obj, "onFinish"));
1270 stream_remove_listener(js, stream_obj, "close", js_get(js, state_obj, "onFinish"));
1271 stream_remove_listener(js, stream_obj, "error", js_get(js, state_obj, "onError"));
1272 }
1273 return js_mkundef();
1274}
1275
1276static ant_value_t stream_finished_fire(ant_t *js, ant_value_t state_obj, ant_value_t error) {
1277 ant_value_t called = js_get(js, state_obj, "called");
1278 ant_value_t callback = js_get(js, state_obj, "callback");
1279 ant_value_t cb_args[1];
1280
1281 if (js_truthy(js, called)) return js_mkundef();
1282 js_set(js, state_obj, "called", js_true);
1283 stream_finished_cleanup(js, state_obj);
1284
1285 if (is_undefined(error)) stream_call_callback(js, callback, NULL, 0);
1286 else {
1287 cb_args[0] = error;
1288 stream_call_callback(js, callback, cb_args, 1);
1289 }
1290 return js_mkundef();
1291}
1292
1293static ant_value_t stream_finished_on_finish(ant_t *js, ant_value_t *args, int nargs) {
1294 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1295 return stream_finished_fire(js, state_obj, js_mkundef());
1296}
1297
1298static ant_value_t stream_finished_on_error(ant_t *js, ant_value_t *args, int nargs) {
1299 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1300 ant_value_t error = nargs > 0 ? args[0] : js_mkundef();
1301 return stream_finished_fire(js, state_obj, error);
1302}
1303
1304static ant_value_t stream_finished_register(ant_t *js, ant_value_t stream_obj, ant_value_t callback) {
1305 ant_value_t state_obj = js_mkobj(js);
1306 ant_value_t on_finish = 0;
1307 ant_value_t on_error = 0;
1308
1309 if (!is_callable(callback)) callback = js_mkfun(stream_noop);
1310
1311 js_set(js, state_obj, "stream", stream_obj);
1312 js_set(js, state_obj, "callback", callback);
1313 js_set(js, state_obj, "called", js_false);
1314 on_finish = js_heavy_mkfun(js, stream_finished_on_finish, state_obj);
1315 on_error = js_heavy_mkfun(js, stream_finished_on_error, state_obj);
1316 js_set(js, state_obj, "onFinish", on_finish);
1317 js_set(js, state_obj, "onError", on_error);
1318
1319 eventemitter_add_listener(js, stream_obj, "end", on_finish, false);
1320 eventemitter_add_listener(js, stream_obj, "finish", on_finish, false);
1321 eventemitter_add_listener(js, stream_obj, "close", on_finish, false);
1322 eventemitter_add_listener(js, stream_obj, "error", on_error, false);
1323 return stream_obj;
1324}
1325
1326static ant_value_t js_stream_finished(ant_t *js, ant_value_t *args, int nargs) {
1327 ant_value_t callback = nargs > 1 ? args[1] : js_mkundef();
1328 if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "finished requires a stream");
1329 return stream_finished_register(js, args[0], callback);
1330}
1331
1332static ant_value_t stream_pipeline_done(ant_t *js, ant_value_t *args, int nargs) {
1333 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1334 ant_value_t callback = js_get(js, state_obj, "callback");
1335 ant_value_t called = js_get(js, state_obj, "called");
1336 ant_value_t cb_args[1];
1337
1338 if (js_truthy(js, called)) return js_mkundef();
1339 js_set(js, state_obj, "called", js_true);
1340
1341 if (nargs > 0 && !is_undefined(args[0])) {
1342 cb_args[0] = args[0];
1343 stream_call_callback(js, callback, cb_args, 1);
1344 } else stream_call_callback(js, callback, NULL, 0);
1345 return js_mkundef();
1346}
1347
1348static ant_value_t stream_pipeline_error(ant_t *js, ant_value_t *args, int nargs) {
1349 ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1350 if (nargs > 0 && !is_undefined(args[0])) stream_call_callback(js, done, &args[0], 1);
1351 return js_mkundef();
1352}
1353
1354static ant_value_t stream_pipeline_schedule_done(ant_t *js, ant_value_t *args, int nargs) {
1355 ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1356 stream_call_callback(js, done, NULL, 0);
1357 return js_mkundef();
1358}
1359
1360static ant_value_t js_stream_pipeline(ant_t *js, ant_value_t *args, int nargs) {
1361 int stream_count = nargs;
1362 ant_value_t callback = js_mkundef();
1363 ant_value_t done_state = 0;
1364 ant_value_t done = 0;
1365
1366 if (nargs > 0 && is_callable(args[nargs - 1])) {
1367 callback = args[nargs - 1];
1368 stream_count--;
1369 }
1370
1371 if (!is_callable(callback)) callback = js_mkfun(stream_noop);
1372 if (stream_count <= 0) return js_mkundef();
1373
1374 done_state = js_mkobj(js);
1375 js_set(js, done_state, "callback", callback);
1376 js_set(js, done_state, "called", js_false);
1377 done = js_heavy_mkfun(js, stream_pipeline_done, done_state);
1378
1379 if (stream_count < 2) {
1380 stream_schedule_microtask(js, stream_pipeline_schedule_done, done);
1381 return args[0];
1382 }
1383
1384 for (int i = 0; i < stream_count - 1; i++) {
1385 ant_value_t error_cb = js_heavy_mkfun(js, stream_pipeline_error, done);
1386 ant_value_t finished_args[2];
1387
1388 finished_args[0] = args[i];
1389 finished_args[1] = error_cb;
1390 js_stream_finished(js, finished_args, 2);
1391
1392 ant_value_t pipe_args[2];
1393 pipe_args[0] = args[i + 1];
1394 pipe_args[1] = js_mkundef();
1395 stream_call_prop(js, args[i], "pipe", pipe_args, 2);
1396 }
1397
1398 {
1399 ant_value_t finished_args[2];
1400 finished_args[0] = args[stream_count - 1];
1401 finished_args[1] = done;
1402 js_stream_finished(js, finished_args, 2);
1403 }
1404
1405 return args[stream_count - 1];
1406}
1407
1408static ant_value_t stream_promise_callback(ant_t *js, ant_value_t *args, int nargs) {
1409 ant_value_t promise = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1410 if (nargs > 0 && !is_undefined(args[0])) js_reject_promise(js, promise, args[0]);
1411 else js_resolve_promise(js, promise, js_mkundef());
1412 return js_mkundef();
1413}
1414
1415static ant_value_t js_stream_promises_finished(ant_t *js, ant_value_t *args, int nargs) {
1416 ant_value_t promise = js_mkpromise(js);
1417 ant_value_t finished_args[2];
1418 if (nargs < 1 || !is_object_type(args[0])) {
1419 js_reject_promise(js, promise, js_mkerr(js, "finished requires a stream"));
1420 return promise;
1421 }
1422 finished_args[0] = args[0];
1423 finished_args[1] = js_heavy_mkfun(js, stream_promise_callback, promise);
1424 js_stream_finished(js, finished_args, 2);
1425 return promise;
1426}
1427
1428static ant_value_t js_stream_promises_pipeline(ant_t *js, ant_value_t *args, int nargs) {
1429 ant_value_t promise = js_mkpromise(js);
1430 ant_value_t *call_args = NULL;
1431
1432 if (nargs <= 0) {
1433 js_resolve_promise(js, promise, js_mkundef());
1434 return promise;
1435 }
1436
1437 call_args = malloc((size_t)(nargs + 1) * sizeof(*call_args));
1438 if (!call_args) {
1439 js_reject_promise(js, promise, js_mkerr(js, "out of memory"));
1440 return promise;
1441 }
1442
1443 for (int i = 0; i < nargs; i++) call_args[i] = args[i];
1444 call_args[nargs] = js_heavy_mkfun(js, stream_promise_callback, promise);
1445 js_stream_pipeline(js, call_args, nargs + 1);
1446 free(call_args);
1447 return promise;
1448}
1449
1450static void stream_release_reader(ant_t *js, ant_value_t state_obj) {
1451 ant_value_t reader = js_get(js, state_obj, "reader");
1452 if (!is_object_type(reader)) return;
1453 stream_call_prop(js, reader, "releaseLock", NULL, 0);
1454}
1455
1456static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs);
1457
1458static void stream_readable_from_schedule(ant_t *js, ant_value_t state_obj) {
1459 stream_schedule_microtask(js, stream_readable_from_step, state_obj);
1460}
1461
1462static ant_value_t stream_readable_from_fail(ant_t *js, ant_value_t state_obj, ant_value_t error) {
1463 ant_value_t readable = js_get(js, state_obj, "readable");
1464 stream_release_reader(js, state_obj);
1465 if (stream_is_instance(readable)) {
1466 ant_value_t destroy_args[1] = { error };
1467 ant_value_t saved_this = js->this_val;
1468 js->this_val = readable;
1469 js_stream_destroy(js, destroy_args, 1);
1470 js->this_val = saved_this;
1471 }
1472 return js_mkundef();
1473}
1474
1475static ant_value_t stream_readable_from_handle_result(ant_t *js, ant_value_t *args, int nargs) {
1476 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1477 ant_value_t readable = js_get(js, state_obj, "readable");
1478 ant_value_t result = nargs > 0 ? args[0] : js_mkundef();
1479 ant_value_t done = 0;
1480 ant_value_t value = 0;
1481
1482 if (js_truthy(js, js_get(js, readable, "destroyed"))) {
1483 stream_release_reader(js, state_obj);
1484 return js_mkundef();
1485 }
1486
1487 if (!is_object_type(result)) return stream_readable_from_fail(js, state_obj, js_mkerr(js, "iterator step must be an object"));
1488 done = js_get(js, result, "done");
1489 value = js_get(js, result, "value");
1490 if (js_truthy(js, done)) {
1491 stream_release_reader(js, state_obj);
1492 stream_readable_push_value(js, readable, js_mknull(), js_mkundef());
1493 return js_mkundef();
1494 }
1495
1496 stream_readable_push_value(js, readable, value, js_mkundef());
1497 if (!js_truthy(js, js_get(js, readable, "destroyed"))) stream_readable_from_schedule(js, state_obj);
1498 return js_mkundef();
1499}
1500
1501static ant_value_t stream_readable_from_reject(ant_t *js, ant_value_t *args, int nargs) {
1502 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1503 ant_value_t error = nargs > 0 ? args[0] : js_mkerr(js, "stream iteration failed");
1504 return stream_readable_from_fail(js, state_obj, error);
1505}
1506
1507static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs) {
1508 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1509 ant_value_t readable = js_get(js, state_obj, "readable");
1510 ant_value_t mode = js_get(js, state_obj, "mode");
1511 ant_value_t iterator = js_get(js, state_obj, "iterator");
1512
1513 ant_value_t next_result = js_mkundef();
1514 ant_value_t on_resolve = js_heavy_mkfun(js, stream_readable_from_handle_result, state_obj);
1515 ant_value_t on_reject = js_heavy_mkfun(js, stream_readable_from_reject, state_obj);
1516 ant_value_t then_result = 0;
1517
1518 if (js_truthy(js, js_get(js, readable, "destroyed"))) {
1519 stream_release_reader(js, state_obj);
1520 return js_mkundef();
1521 }
1522
1523 if (stream_key_is_cstr(js, mode, "reader")) next_result = stream_call_prop(js, iterator, "read", NULL, 0);
1524 else next_result = stream_call_prop(js, iterator, "next", NULL, 0);
1525
1526 if (is_err(next_result)) return stream_readable_from_fail(js, state_obj, next_result);
1527 if (vtype(next_result) == T_PROMISE) {
1528 then_result = js_promise_then(js, next_result, on_resolve, on_reject);
1529 promise_mark_handled(then_result);
1530 return js_mkundef();
1531 }
1532
1533 ant_value_t one_arg[1] = { next_result };
1534 return stream_readable_from_handle_result(js, one_arg, 1);
1535}
1536
1537static ant_value_t stream_readable_from_start(ant_t *js, ant_value_t *args, int nargs) {
1538 ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
1539 ant_value_t readable = js_get(js, state_obj, "readable");
1540 ant_value_t source = js_get(js, state_obj, "source");
1541
1542 ant_value_t async_iter_fn = 0;
1543 ant_value_t reader_fn = 0;
1544 js_iter_t it;
1545
1546 if (js_truthy(js, js_get(js, readable, "destroyed"))) return js_mkundef();
1547
1548 async_iter_fn = is_object_type(source) ? js_get_sym(js, source, get_asyncIterator_sym()) : js_mkundef();
1549 if (is_callable(async_iter_fn)) {
1550 ant_value_t iterator = stream_call(js, async_iter_fn, source, NULL, 0, false);
1551 if (is_err(iterator)) return stream_readable_from_fail(js, state_obj, iterator);
1552 js_set(js, state_obj, "iterator", iterator);
1553 js_set(js, state_obj, "mode", js_mkstr(js, "async", 5));
1554 stream_readable_from_schedule(js, state_obj);
1555 return js_mkundef();
1556 }
1557
1558 if (js_iter_open(js, source, &it)) {
1559 ant_value_t value = 0;
1560 while (js_iter_next(js, &it, &value)) {
1561 if (js_truthy(js, js_get(js, readable, "destroyed"))) break;
1562 stream_readable_push_value(js, readable, value, js_mkundef());
1563 }
1564 js_iter_close(js, &it);
1565 if (!js_truthy(js, js_get(js, readable, "destroyed")))
1566 stream_readable_push_value(js, readable, js_mknull(), js_mkundef());
1567 return js_mkundef();
1568 }
1569
1570 reader_fn = is_object_type(source) ? js_get(js, source, "getReader") : js_mkundef();
1571 if (is_callable(reader_fn)) {
1572 ant_value_t reader = stream_call(js, reader_fn, source, NULL, 0, false);
1573 if (is_err(reader)) return stream_readable_from_fail(js, state_obj, reader);
1574 js_set(js, state_obj, "reader", reader);
1575 js_set(js, state_obj, "iterator", reader);
1576 js_set(js, state_obj, "mode", js_mkstr(js, "reader", 6));
1577 stream_readable_from_schedule(js, state_obj);
1578 return js_mkundef();
1579 }
1580
1581 if (!is_undefined(source)) stream_readable_push_value(js, readable, source, js_mkundef());
1582 stream_readable_push_value(js, readable, js_mknull(), js_mkundef());
1583
1584 return js_mkundef();
1585}
1586
1587static ant_value_t js_readable_from(ant_t *js, ant_value_t *args, int nargs) {
1588 ant_value_t ctor_args[1];
1589 ant_value_t readable = 0;
1590 ant_value_t state_obj = 0;
1591
1592 ctor_args[0] = nargs > 1 ? args[1] : js_mkundef();
1593 readable = stream_construct(js, g_readable_proto, ctor_args[0], stream_init_readable);
1594 if (is_err(readable)) return readable;
1595
1596 state_obj = js_mkobj(js);
1597 js_set(js, state_obj, "readable", readable);
1598 js_set(js, state_obj, "source", nargs > 0 ? args[0] : js_mkundef());
1599 js_set(js, state_obj, "iterator", js_mkundef());
1600 js_set(js, state_obj, "reader", js_mkundef());
1601 js_set(js, state_obj, "mode", js_mkundef());
1602 stream_schedule_microtask(js, stream_readable_from_start, state_obj);
1603
1604 return readable;
1605}
1606
1607static ant_value_t js_readable_from_web(ant_t *js, ant_value_t *args, int nargs) {
1608 return js_readable_from(js, args, nargs);
1609}
1610
1611static ant_value_t js_stream_ctor(ant_t *js, ant_value_t *args, int nargs) {
1612 return stream_construct(js, g_stream_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_base);
1613}
1614
1615static ant_value_t js_readable_ctor(ant_t *js, ant_value_t *args, int nargs) {
1616 return stream_construct(js, g_readable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_readable);
1617}
1618
1619static ant_value_t js_writable_ctor(ant_t *js, ant_value_t *args, int nargs) {
1620 return stream_construct(js, g_writable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_writable);
1621}
1622
1623static ant_value_t js_duplex_ctor(ant_t *js, ant_value_t *args, int nargs) {
1624 ant_value_t proto = js_instance_proto_from_new_target(js, g_duplex_proto);
1625 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_duplex_proto);
1626 ant_value_t options = nargs > 0 ? args[0] : js_mkundef();
1627 ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js);
1628
1629 stream_init_readable(js, obj, options);
1630 stream_init_writable(js, obj, options);
1631
1632 js_set(js, obj, "readable", js_true);
1633 js_set(js, obj, "writable", js_true);
1634 js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false));
1635
1636 return obj;
1637}
1638
1639static ant_value_t js_transform_ctor(ant_t *js, ant_value_t *args, int nargs) {
1640 ant_value_t proto = js_instance_proto_from_new_target(js, g_transform_proto);
1641 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_transform_proto);
1642 ant_value_t options = nargs > 0 ? args[0] : js_mkundef();
1643 ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js);
1644
1645 ant_value_t transform_fn = 0;
1646 ant_value_t flush_fn = 0;
1647
1648 stream_init_readable(js, obj, options);
1649 stream_init_writable(js, obj, options);
1650
1651 js_set(js, obj, "readable", js_true);
1652 js_set(js, obj, "writable", js_true);
1653 js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false));
1654
1655 transform_fn = stream_get_option(js, options_obj, "transform");
1656 flush_fn = stream_get_option(js, options_obj, "flush");
1657
1658 if (is_callable(transform_fn)) js_set(js, obj, "_transform", transform_fn);
1659 if (is_callable(flush_fn)) js_set(js, obj, "_flush", flush_fn);
1660
1661 return obj;
1662}
1663
1664static ant_value_t js_passthrough_ctor(ant_t *js, ant_value_t *args, int nargs) {
1665 ant_value_t proto = js_instance_proto_from_new_target(js, g_passthrough_proto);
1666 ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_passthrough_proto);
1667 ant_value_t options = nargs > 0 ? args[0] : js_mkundef();
1668 ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js);
1669
1670 stream_init_readable(js, obj, options);
1671 stream_init_writable(js, obj, options);
1672
1673 js_set(js, obj, "readable", js_true);
1674 js_set(js, obj, "writable", js_true);
1675 js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false));
1676
1677 return obj;
1678}
1679
1680void stream_init_constructors(ant_t *js) {
1681 ant_value_t events = 0;
1682 ant_value_t ee_ctor = 0;
1683 ant_value_t ee_proto = 0;
1684
1685 if (g_stream_ctor) return;
1686
1687 events = events_library(js);
1688 ee_ctor = js_get(js, events, "EventEmitter");
1689 ee_proto = js_get(js, ee_ctor, "prototype");
1690
1691 g_stream_proto = js_mkobj(js);
1692 js_set_proto_init(g_stream_proto, ee_proto);
1693 js_set(js, g_stream_proto, "pipe", js_mkfun(js_stream_pipe));
1694 js_set(js, g_stream_proto, "unpipe", js_mkfun(js_stream_unpipe));
1695 js_set(js, g_stream_proto, "pause", js_mkfun(js_stream_pause));
1696 js_set(js, g_stream_proto, "resume", js_mkfun(js_stream_resume));
1697 js_set(js, g_stream_proto, "isPaused", js_mkfun(js_stream_is_paused));
1698 js_set(js, g_stream_proto, "destroy", js_mkfun(js_stream_destroy));
1699 js_set_sym(js, g_stream_proto, get_toStringTag_sym(), js_mkstr(js, "Stream", 6));
1700 g_stream_ctor = js_make_ctor(js, js_stream_ctor, g_stream_proto, "Stream", 6);
1701
1702 g_readable_proto = js_mkobj(js);
1703 js_set_proto_init(g_readable_proto, g_stream_proto);
1704 js_set(js, g_readable_proto, "_read", js_mkfun(js_readable__read));
1705 js_set(js, g_readable_proto, "push", js_mkfun(js_readable_push));
1706 js_set(js, g_readable_proto, "read", js_mkfun(js_readable_read));
1707 js_set(js, g_readable_proto, "setEncoding", js_mkfun(js_readable_set_encoding));
1708 js_set(js, g_readable_proto, "on", js_mkfun(js_readable_on));
1709 js_set(js, g_readable_proto, "resume", js_mkfun(js_readable_resume));
1710 js_set(js, g_readable_proto, "pause", js_mkfun(js_readable_pause));
1711 js_set_sym(js, g_readable_proto, get_toStringTag_sym(), js_mkstr(js, "Readable", 8));
1712 g_readable_ctor = js_make_ctor(js, js_readable_ctor, g_readable_proto, "Readable", 8);
1713 js_set(js, g_readable_ctor, "from", js_mkfun(js_readable_from));
1714 js_set(js, g_readable_ctor, "fromWeb", js_mkfun(js_readable_from_web));
1715
1716 g_writable_proto = js_mkobj(js);
1717 js_set_proto_init(g_writable_proto, g_stream_proto);
1718 js_set(js, g_writable_proto, "_write", js_mkfun(js_writable__write));
1719 js_set(js, g_writable_proto, "_final", js_mkfun(js_writable__final));
1720 js_set(js, g_writable_proto, "write", js_mkfun(js_writable_write));
1721 js_set(js, g_writable_proto, "end", js_mkfun(js_writable_end));
1722 js_set(js, g_writable_proto, "cork", js_mkfun(stream_noop));
1723 js_set(js, g_writable_proto, "uncork", js_mkfun(stream_noop));
1724 js_set_sym(js, g_writable_proto, get_toStringTag_sym(), js_mkstr(js, "Writable", 8));
1725 g_writable_ctor = js_make_ctor(js, js_writable_ctor, g_writable_proto, "Writable", 8);
1726
1727 g_duplex_proto = js_mkobj(js);
1728 js_set_proto_init(g_duplex_proto, g_readable_proto);
1729 js_set(js, g_duplex_proto, "_write", js_mkfun(js_writable__write));
1730 js_set(js, g_duplex_proto, "_final", js_mkfun(js_writable__final));
1731 js_set(js, g_duplex_proto, "write", js_mkfun(js_writable_write));
1732 js_set(js, g_duplex_proto, "end", js_mkfun(js_writable_end));
1733 js_set(js, g_duplex_proto, "cork", js_mkfun(stream_noop));
1734 js_set(js, g_duplex_proto, "uncork", js_mkfun(stream_noop));
1735 js_set_sym(js, g_duplex_proto, get_toStringTag_sym(), js_mkstr(js, "Duplex", 6));
1736 g_duplex_ctor = js_make_ctor(js, js_duplex_ctor, g_duplex_proto, "Duplex", 6);
1737
1738 g_transform_proto = js_mkobj(js);
1739 js_set_proto_init(g_transform_proto, g_duplex_proto);
1740 js_set(js, g_transform_proto, "_transform", js_mkfun(js_transform__transform));
1741 js_set(js, g_transform_proto, "_write", js_mkfun(js_transform__write));
1742 js_set(js, g_transform_proto, "_final", js_mkfun(js_transform__final));
1743 js_set_sym(js, g_transform_proto, get_toStringTag_sym(), js_mkstr(js, "Transform", 9));
1744 g_transform_ctor = js_make_ctor(js, js_transform_ctor, g_transform_proto, "Transform", 9);
1745
1746 g_passthrough_proto = js_mkobj(js);
1747 js_set_proto_init(g_passthrough_proto, g_transform_proto);
1748 js_set(js, g_passthrough_proto, "_transform", js_mkfun(js_passthrough__transform));
1749 js_set_sym(js, g_passthrough_proto, get_toStringTag_sym(), js_mkstr(js, "PassThrough", 11));
1750 g_passthrough_ctor = js_make_ctor(js, js_passthrough_ctor, g_passthrough_proto, "PassThrough", 11);
1751
1752 gc_register_root(&g_stream_proto);
1753 gc_register_root(&g_stream_ctor);
1754 gc_register_root(&g_readable_proto);
1755 gc_register_root(&g_readable_ctor);
1756 gc_register_root(&g_writable_proto);
1757 gc_register_root(&g_writable_ctor);
1758 gc_register_root(&g_duplex_proto);
1759 gc_register_root(&g_duplex_ctor);
1760 gc_register_root(&g_transform_proto);
1761 gc_register_root(&g_transform_ctor);
1762 gc_register_root(&g_passthrough_proto);
1763 gc_register_root(&g_passthrough_ctor);
1764}
1765
1766ant_value_t stream_readable_constructor(ant_t *js) {
1767 stream_init_constructors(js);
1768 return g_readable_ctor;
1769}
1770
1771ant_value_t stream_writable_constructor(ant_t *js) {
1772 stream_init_constructors(js);
1773 return g_writable_ctor;
1774}
1775
1776ant_value_t stream_readable_prototype(ant_t *js) {
1777 stream_init_constructors(js);
1778 return g_readable_proto;
1779}
1780
1781ant_value_t stream_writable_prototype(ant_t *js) {
1782 stream_init_constructors(js);
1783 return g_writable_proto;
1784}
1785
1786ant_value_t stream_duplex_prototype(ant_t *js) {
1787 stream_init_constructors(js);
1788 return g_duplex_proto;
1789}
1790
1791ant_value_t stream_construct_readable(ant_t *js, ant_value_t base_proto, ant_value_t options) {
1792 stream_init_constructors(js);
1793 return stream_construct(js, base_proto, options, stream_init_readable);
1794}
1795
1796ant_value_t stream_construct_writable(ant_t *js, ant_value_t base_proto, ant_value_t options) {
1797 stream_init_constructors(js);
1798 return stream_construct(js, base_proto, options, stream_init_writable);
1799}
1800
1801void stream_init_readable_object(ant_t *js, ant_value_t obj, ant_value_t options) {
1802 stream_init_constructors(js);
1803 if (!is_object_type(obj)) return;
1804 js_set_native_tag(obj, STREAM_NATIVE_TAG);
1805 stream_init_readable(js, obj, options);
1806}
1807
1808void stream_init_writable_object(ant_t *js, ant_value_t obj, ant_value_t options) {
1809 stream_init_constructors(js);
1810 if (!is_object_type(obj)) return;
1811 js_set_native_tag(obj, STREAM_NATIVE_TAG);
1812 stream_init_writable(js, obj, options);
1813}
1814
1815void stream_init_duplex_object(ant_t *js, ant_value_t obj, ant_value_t options) {
1816 stream_init_constructors(js);
1817 if (!is_object_type(obj)) return;
1818 js_set_native_tag(obj, STREAM_NATIVE_TAG);
1819 stream_init_readable(js, obj, options);
1820 stream_init_writable(js, obj, options);
1821}
1822
1823ant_value_t stream_readable_push(ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding) {
1824 stream_init_constructors(js);
1825 return stream_readable_push_value(js, stream_obj, chunk, encoding);
1826}
1827
1828static ant_value_t js_stream_get_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) {
1829 bool object_mode = nargs > 0 && js_truthy(js, args[0]);
1830 return js_mknum(stream_default_high_water_mark(object_mode));
1831}
1832
1833static ant_value_t js_stream_set_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) {
1834 if (nargs < 2 || vtype(args[1]) != T_NUM || js_getnum(args[1]) < 0)
1835 return js_mkerr_typed(js, JS_ERR_RANGE, "setDefaultHighWaterMark requires a non-negative number");
1836
1837 bool object_mode = js_truthy(js, args[0]);
1838 if (object_mode) g_default_object_high_water_mark = js_getnum(args[1]);
1839 else g_default_high_water_mark = js_getnum(args[1]);
1840
1841 return js_mkundef();
1842}
1843
1844static void stream_web_copy_global(ant_t *js, ant_value_t obj, const char *name) {
1845 ant_value_t value = js_get(js, js->global, name);
1846 if (is_err(value)) return;
1847 js_set(js, obj, name, value);
1848}
1849
1850// TODO: remove copy-on-start
1851static void stream_web_define_common(ant_t *js, ant_value_t obj) {
1852 stream_web_copy_global(js, obj, "ReadableStream");
1853 stream_web_copy_global(js, obj, "ReadableStreamDefaultReader");
1854 stream_web_copy_global(js, obj, "ReadableStreamDefaultController");
1855 stream_web_copy_global(js, obj, "WritableStream");
1856 stream_web_copy_global(js, obj, "WritableStreamDefaultWriter");
1857 stream_web_copy_global(js, obj, "WritableStreamDefaultController");
1858 stream_web_copy_global(js, obj, "TransformStream");
1859 stream_web_copy_global(js, obj, "TransformStreamDefaultController");
1860 stream_web_copy_global(js, obj, "ByteLengthQueuingStrategy");
1861 stream_web_copy_global(js, obj, "CountQueuingStrategy");
1862 stream_web_copy_global(js, obj, "TextEncoderStream");
1863 stream_web_copy_global(js, obj, "TextDecoderStream");
1864 stream_web_copy_global(js, obj, "CompressionStream");
1865 stream_web_copy_global(js, obj, "DecompressionStream");
1866}
1867
1868ant_value_t stream_library(ant_t *js) {
1869 ant_value_t lib = js_mkobj(js);
1870 ant_value_t promises = js_mkobj(js);
1871 stream_init_constructors(js);
1872
1873 js_set(js, promises, "pipeline", js_mkfun(js_stream_promises_pipeline));
1874 js_set(js, promises, "finished", js_mkfun(js_stream_promises_finished));
1875
1876 js_set_module_default(js, lib, g_stream_ctor, "Stream");
1877 js_set(js, lib, "Readable", g_readable_ctor);
1878 js_set(js, lib, "Writable", g_writable_ctor);
1879 js_set(js, lib, "Duplex", g_duplex_ctor);
1880 js_set(js, lib, "Transform", g_transform_ctor);
1881 js_set(js, lib, "PassThrough", g_passthrough_ctor);
1882 js_set(js, lib, "pipeline", js_mkfun(js_stream_pipeline));
1883 js_set(js, lib, "finished", js_mkfun(js_stream_finished));
1884 js_set(js, lib, "getDefaultHighWaterMark", js_mkfun(js_stream_get_default_high_water_mark));
1885 js_set(js, lib, "setDefaultHighWaterMark", js_mkfun(js_stream_set_default_high_water_mark));
1886 js_set(js, lib, "promises", promises);
1887
1888 js_set(js, g_stream_ctor, "Readable", g_readable_ctor);
1889 js_set(js, g_stream_ctor, "Writable", g_writable_ctor);
1890 js_set(js, g_stream_ctor, "Duplex", g_duplex_ctor);
1891 js_set(js, g_stream_ctor, "Transform", g_transform_ctor);
1892 js_set(js, g_stream_ctor, "PassThrough", g_passthrough_ctor);
1893 js_set(js, g_stream_ctor, "pipeline", js_get(js, lib, "pipeline"));
1894 js_set(js, g_stream_ctor, "finished", js_get(js, lib, "finished"));
1895 js_set(js, g_stream_ctor, "getDefaultHighWaterMark", js_get(js, lib, "getDefaultHighWaterMark"));
1896 js_set(js, g_stream_ctor, "setDefaultHighWaterMark", js_get(js, lib, "setDefaultHighWaterMark"));
1897 js_set(js, g_stream_ctor, "promises", promises);
1898
1899 js_set(js, promises, "default", promises);
1900 js_set_slot_wb(js, promises, SLOT_DEFAULT, promises);
1901 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream", 6));
1902
1903 return lib;
1904}
1905
1906ant_value_t stream_promises_library(ant_t *js) {
1907 ant_value_t stream_ns = js_esm_import_sync_cstr(js, "stream", 6);
1908 ant_value_t promises = 0;
1909
1910 if (is_err(stream_ns)) return stream_ns;
1911 promises = js_get(js, stream_ns, "promises");
1912
1913 if (!is_object_type(promises)) return js_mkerr(js, "stream.promises is not available");
1914 js_set(js, promises, "default", promises);
1915 js_set_slot_wb(js, promises, SLOT_DEFAULT, promises);
1916
1917 return promises;
1918}
1919
1920ant_value_t stream_web_library(ant_t *js) {
1921 ant_value_t lib = js_mkobj(js);
1922
1923 stream_web_define_common(js, lib);
1924 js_set(js, lib, "default", lib);
1925 js_set_slot_wb(js, lib, SLOT_DEFAULT, lib);
1926 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream/web", 10));
1927
1928 return lib;
1929}