MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1#include <stdlib.h>
2#include <string.h>
3
4#include "ant.h"
5#include "errors.h"
6#include "runtime.h"
7#include "internal.h"
8#include "descriptors.h"
9
10#include "gc/roots.h"
11#include "silver/engine.h"
12#include "modules/symbol.h"
13#include "modules/assert.h"
14#include "modules/abort.h"
15#include "streams/writable.h"
16
17ant_value_t g_ws_proto;
18ant_value_t g_ws_writer_proto;
19ant_value_t g_ws_controller_proto;
20static ant_value_t g_close_sentinel;
21
22bool ws_is_stream(ant_value_t obj) {
23 return js_check_brand(obj, BRAND_WRITABLE_STREAM)
24 && ws_get_stream(obj) != NULL;
25}
26
27bool ws_is_writer(ant_value_t obj) {
28 return js_check_brand(obj, BRAND_WRITABLE_STREAM_WRITER)
29 && vtype(js_get_slot(obj, SLOT_RS_CLOSED)) == T_PROMISE
30 && vtype(js_get_slot(obj, SLOT_WS_READY)) == T_PROMISE;
31}
32
33bool ws_is_controller(ant_value_t obj) {
34 return js_check_brand(obj, BRAND_WRITABLE_STREAM_CONTROLLER)
35 && ws_get_controller(obj) != NULL
36 && ws_is_stream(js_get_slot(obj, SLOT_ENTRIES));
37}
38
39ws_stream_t *ws_get_stream(ant_value_t obj) {
40 ant_value_t s = js_get_slot(obj, SLOT_DATA);
41 if (vtype(s) != T_NUM) return NULL;
42 return (ws_stream_t *)(uintptr_t)(size_t)js_getnum(s);
43}
44
45ws_controller_t *ws_get_controller(ant_value_t obj) {
46 ant_value_t s = js_get_slot(obj, SLOT_DATA);
47 if (vtype(s) != T_NUM) return NULL;
48 return (ws_controller_t *)(uintptr_t)(size_t)js_getnum(s);
49}
50
51static void ws_stream_finalize(ant_t *js, ant_object_t *obj) {
52 if (!obj->extra_slots) return;
53 ant_extra_slot_t *entries = (ant_extra_slot_t *)obj->extra_slots;
54 for (uint8_t i = 0; i < obj->extra_count; i++) {
55 if (entries[i].slot == SLOT_DATA && vtype(entries[i].value) == T_NUM) {
56 free((ws_stream_t *)(uintptr_t)(size_t)js_getnum(entries[i].value));
57 return;
58 }}
59}
60
61static void ws_controller_finalize(ant_t *js, ant_object_t *obj) {
62 if (!obj->extra_slots) return;
63 ant_extra_slot_t *entries = (ant_extra_slot_t *)obj->extra_slots;
64 for (uint8_t i = 0; i < obj->extra_count; i++) {
65 if (entries[i].slot == SLOT_DATA && vtype(entries[i].value) == T_NUM) {
66 ws_controller_t *ctrl = (ws_controller_t *)(uintptr_t)(size_t)js_getnum(entries[i].value);
67 free(ctrl->queue_sizes);
68 free(ctrl);
69 return;
70 }}
71}
72
73ant_value_t ws_stream_controller(ant_value_t stream_obj) {
74 return js_get_slot(stream_obj, SLOT_ENTRIES);
75}
76
77ant_value_t ws_stream_writer(ant_value_t stream_obj) {
78 return js_get_slot(stream_obj, SLOT_CTOR);
79}
80
81static inline ant_value_t ws_stream_stored_error(ant_value_t stream_obj) {
82 return js_get_slot(stream_obj, SLOT_AUX);
83}
84
85static inline ant_value_t ws_stream_write_requests(ant_t *js, ant_value_t stream_obj) {
86 return js_get_slot(stream_obj, SLOT_SETTLED);
87}
88
89static inline ant_value_t ws_stream_in_flight_write(ant_value_t stream_obj) {
90 return js_get_slot(stream_obj, SLOT_DEFAULT);
91}
92
93static inline ant_value_t ws_stream_close_request(ant_value_t stream_obj) {
94 return js_get_slot(stream_obj, SLOT_WS_CLOSE);
95}
96
97static inline ant_value_t ws_stream_in_flight_close(ant_value_t stream_obj) {
98 return js_get_slot(stream_obj, SLOT_WS_ABORT);
99}
100
101static inline ant_value_t ws_stream_pending_abort_promise(ant_value_t stream_obj) {
102 return js_get_slot(stream_obj, SLOT_WS_READY);
103}
104
105static inline ant_value_t ws_ctrl_stream(ant_value_t ctrl_obj) {
106 return js_get_slot(ctrl_obj, SLOT_ENTRIES);
107}
108
109static inline ant_value_t ws_ctrl_write_fn(ant_value_t ctrl_obj) {
110 return js_get_slot(ctrl_obj, SLOT_WS_WRITE);
111}
112
113static inline ant_value_t ws_ctrl_close_fn(ant_value_t ctrl_obj) {
114 return js_get_slot(ctrl_obj, SLOT_WS_CLOSE);
115}
116
117static inline ant_value_t ws_ctrl_abort_fn(ant_value_t ctrl_obj) {
118 return js_get_slot(ctrl_obj, SLOT_WS_ABORT);
119}
120
121static inline ant_value_t ws_ctrl_size_fn(ant_value_t ctrl_obj) {
122 return js_get_slot(ctrl_obj, SLOT_RS_SIZE);
123}
124
125static inline ant_value_t ws_ctrl_sink(ant_value_t ctrl_obj) {
126 return js_get_slot(ctrl_obj, SLOT_CTOR);
127}
128
129static inline ant_value_t ws_ctrl_queue(ant_value_t ctrl_obj) {
130 return js_get_slot(ctrl_obj, SLOT_AUX);
131}
132
133static inline ant_value_t ws_ctrl_signal(ant_value_t ctrl_obj) {
134 return js_get_slot(ctrl_obj, SLOT_WS_SIGNAL);
135}
136
137static inline ant_value_t ws_writer_stream(ant_value_t writer_obj) {
138 return js_get_slot(writer_obj, SLOT_ENTRIES);
139}
140
141static inline ant_value_t ws_writer_closed(ant_value_t writer_obj) {
142 return js_get_slot(writer_obj, SLOT_RS_CLOSED);
143}
144
145ant_value_t ws_writer_ready(ant_value_t writer_obj) {
146 return js_get_slot(writer_obj, SLOT_WS_READY);
147}
148
149static void ws_ctrl_queue_push(ant_t *js, ant_value_t ctrl_obj, ant_value_t value) {
150 ant_value_t arr = ws_ctrl_queue(ctrl_obj);
151 if (vtype(arr) == T_ARR) js_arr_push(js, arr, value);
152}
153
154static ant_value_t ws_ctrl_queue_shift(ant_t *js, ant_value_t ctrl_obj) {
155 ant_value_t arr = ws_ctrl_queue(ctrl_obj);
156 if (vtype(arr) != T_ARR) return js_mkundef();
157 ant_object_t *aobj = js_obj_ptr(arr);
158 if (aobj->u.array.len == 0) return js_mkundef();
159 ant_value_t val = aobj->u.array.data[0];
160 uint32_t new_len = aobj->u.array.len - 1;
161 for (uint32_t i = 0; i < new_len; i++)
162 aobj->u.array.data[i] = aobj->u.array.data[i + 1];
163 aobj->u.array.len = new_len;
164 return val;
165}
166
167static ant_value_t ws_ctrl_queue_peek(ant_t *js, ant_value_t ctrl_obj) {
168 ant_value_t arr = ws_ctrl_queue(ctrl_obj);
169 if (vtype(arr) != T_ARR) return js_mkundef();
170 ant_object_t *aobj = js_obj_ptr(arr);
171 if (aobj->u.array.len == 0) return js_mkundef();
172 return aobj->u.array.data[0];
173}
174
175static bool ws_ctrl_queue_empty(ant_value_t ctrl_obj) {
176 ant_value_t arr = ws_ctrl_queue(ctrl_obj);
177 if (vtype(arr) != T_ARR) return true;
178 ant_object_t *aobj = js_obj_ptr(arr);
179 return aobj->u.array.len == 0;
180}
181
182static void ws_write_reqs_push(ant_t *js, ant_value_t stream_obj, ant_value_t promise) {
183 ant_value_t arr = ws_stream_write_requests(js, stream_obj);
184 if (vtype(arr) == T_ARR) js_arr_push(js, arr, promise);
185}
186
187static ant_value_t ws_write_reqs_shift(ant_t *js, ant_value_t stream_obj) {
188 ant_value_t arr = ws_stream_write_requests(js, stream_obj);
189 if (vtype(arr) != T_ARR) return js_mkundef();
190 ant_object_t *aobj = js_obj_ptr(arr);
191 if (aobj->u.array.len == 0) return js_mkundef();
192 ant_value_t val = aobj->u.array.data[0];
193 uint32_t new_len = aobj->u.array.len - 1;
194 for (uint32_t i = 0; i < new_len; i++)
195 aobj->u.array.data[i] = aobj->u.array.data[i + 1];
196 aobj->u.array.len = new_len;
197 return val;
198}
199
200static void ws_chain_promise(ant_t *js, ant_value_t val, ant_value_t res_fn, ant_value_t rej_fn) {
201 GC_ROOT_SAVE(root_mark, js);
202 GC_ROOT_PIN(js, val);
203 GC_ROOT_PIN(js, res_fn);
204 GC_ROOT_PIN(js, rej_fn);
205
206 ant_value_t promise = val;
207 GC_ROOT_PIN(js, promise);
208 if (vtype(promise) != T_PROMISE) {
209 promise = js_mkpromise(js);
210 GC_ROOT_PIN(js, promise);
211 js_resolve_promise(js, promise, val);
212 }
213
214 ant_value_t then_result = js_promise_then(js, promise, res_fn, rej_fn);
215 GC_ROOT_PIN(js, then_result);
216 promise_mark_handled(then_result);
217 GC_ROOT_RESTORE(js, root_mark);
218}
219
220static void ws_default_controller_clear_algorithms(ant_value_t ctrl_obj) {
221 js_set_slot(ctrl_obj, SLOT_WS_WRITE, js_mkundef());
222 js_set_slot(ctrl_obj, SLOT_WS_CLOSE, js_mkundef());
223 js_set_slot(ctrl_obj, SLOT_WS_ABORT, js_mkundef());
224 js_set_slot(ctrl_obj, SLOT_RS_SIZE, js_mkundef());
225}
226
227static double ws_default_controller_get_desired_size(ws_controller_t *ctrl) {
228 return ctrl->strategy_hwm - ctrl->queue_total_size;
229}
230
231static bool ws_default_controller_get_backpressure(ws_controller_t *ctrl) {
232 return ws_default_controller_get_desired_size(ctrl) <= 0;
233}
234
235bool writable_stream_close_queued_or_in_flight(ant_value_t stream_obj) {
236 ant_value_t cr = ws_stream_close_request(stream_obj);
237 ant_value_t icr = ws_stream_in_flight_close(stream_obj);
238 return !is_undefined(cr) || !is_undefined(icr);
239}
240
241static bool writable_stream_has_operation_in_flight(ant_value_t stream_obj) {
242 ant_value_t iw = ws_stream_in_flight_write(stream_obj);
243 ant_value_t ic = ws_stream_in_flight_close(stream_obj);
244 return !is_undefined(iw) || !is_undefined(ic);
245}
246
247static void ws_writer_replace_ready_promise_rejected(ant_t *js, ant_value_t writer_obj, ant_value_t error) {
248 ant_value_t ready = js_mkpromise(js);
249 js_reject_promise(js, ready, error);
250 promise_mark_handled(ready);
251 js_set_slot_wb(js, writer_obj, SLOT_WS_READY, ready);
252}
253
254static void ws_writer_replace_closed_promise_rejected(ant_t *js, ant_value_t writer_obj, ant_value_t error) {
255 ant_value_t closed = js_mkpromise(js);
256 js_reject_promise(js, closed, error);
257 promise_mark_handled(closed);
258 js_set_slot_wb(js, writer_obj, SLOT_RS_CLOSED, closed);
259}
260
261static void ws_writer_reject_ready_promise(ant_t *js, ant_value_t writer_obj, ant_value_t error) {
262 ant_value_t ready = ws_writer_ready(writer_obj);
263 if (!is_undefined(ready)) js_reject_promise(js, ready, error);
264}
265
266static void ws_writer_reject_closed_promise(ant_t *js, ant_value_t writer_obj, ant_value_t error) {
267 ant_value_t closed = ws_writer_closed(writer_obj);
268 if (!is_undefined(closed)) js_reject_promise(js, closed, error);
269}
270
271static void writable_stream_start_erroring(ant_t *js, ant_value_t stream_obj, ant_value_t reason) {
272 ws_stream_t *stream = ws_get_stream(stream_obj);
273 if (!stream || stream->state != WS_STATE_WRITABLE) return;
274
275 ant_value_t ctrl_obj = ws_stream_controller(stream_obj);
276 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
277
278 stream->state = WS_STATE_ERRORING;
279 js_set_slot_wb(js, stream_obj, SLOT_AUX, reason);
280
281 ant_value_t signal_ac = ws_ctrl_signal(ctrl_obj);
282 if (is_object_type(signal_ac)) {
283 ant_value_t signal = js_get(js, signal_ac, "signal");
284 if (abort_signal_is_signal(signal))
285 signal_do_abort(js, signal, reason);
286 }
287
288 ant_value_t writer_obj = ws_stream_writer(stream_obj);
289 if (ws_is_writer(writer_obj))
290 ws_writer_reject_ready_promise(js, writer_obj, reason);
291
292 if (!writable_stream_has_operation_in_flight(stream_obj) && ctrl && ctrl->started)
293 writable_stream_finish_erroring(js, stream_obj);
294}
295
296static void ws_reject_close_and_closed(ant_t *js, ant_value_t stream_obj) {
297 ant_value_t stored_error = ws_stream_stored_error(stream_obj);
298 ant_value_t cr = ws_stream_close_request(stream_obj);
299 if (!is_undefined(cr)) {
300 js_reject_promise(js, cr, stored_error);
301 js_set_slot(stream_obj, SLOT_WS_CLOSE, js_mkundef());
302 }
303 ant_value_t writer_obj = ws_stream_writer(stream_obj);
304 if (ws_is_writer(writer_obj))
305 ws_writer_reject_closed_promise(js, writer_obj, stored_error);
306}
307
308static ant_value_t ws_finish_erroring_abort_resolve(ant_t *js, ant_value_t *args, int nargs) {
309 ant_value_t wrapper = js_get_slot(js->current_func, SLOT_DATA);
310 ant_value_t p = js_get_slot(wrapper, SLOT_DATA);
311 ant_value_t stream_obj = js_get_slot(wrapper, SLOT_ENTRIES);
312 js_resolve_promise(js, p, js_mkundef());
313 ws_reject_close_and_closed(js, stream_obj);
314 return js_mkundef();
315}
316
317static ant_value_t ws_finish_erroring_abort_reject(ant_t *js, ant_value_t *args, int nargs) {
318 ant_value_t wrapper = js_get_slot(js->current_func, SLOT_DATA);
319 ant_value_t p = js_get_slot(wrapper, SLOT_DATA);
320 ant_value_t stream_obj = js_get_slot(wrapper, SLOT_ENTRIES);
321 ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
322 js_reject_promise(js, p, reason);
323 ws_reject_close_and_closed(js, stream_obj);
324 return js_mkundef();
325}
326
327void writable_stream_finish_erroring(ant_t *js, ant_value_t stream_obj) {
328 ws_stream_t *stream = ws_get_stream(stream_obj);
329 if (!stream || stream->state != WS_STATE_ERRORING) return;
330 stream->state = WS_STATE_ERRORED;
331
332 ant_value_t ctrl_obj = ws_stream_controller(stream_obj);
333 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
334
335 ant_value_t saved_abort_fn = ws_ctrl_abort_fn(ctrl_obj);
336 ant_value_t saved_sink = ws_ctrl_sink(ctrl_obj);
337
338 if (ctrl) {
339 ctrl->queue_total_size = 0;
340 ctrl->queue_sizes_len = 0;
341 }
342 ws_default_controller_clear_algorithms(ctrl_obj);
343 ant_value_t stored_error = ws_stream_stored_error(stream_obj);
344
345 ant_value_t wr_arr = ws_stream_write_requests(js, stream_obj);
346 if (vtype(wr_arr) == T_ARR) {
347 ant_offset_t len = js_arr_len(js, wr_arr);
348 for (ant_offset_t i = 0; i < len; i++)
349 js_reject_promise(js, js_arr_get(js, wr_arr, i), stored_error);
350 ant_object_t *aobj = js_obj_ptr(wr_arr);
351 aobj->u.array.len = 0;
352 }
353
354 if (stream->has_pending_abort) {
355 stream->has_pending_abort = false;
356 ant_value_t abort_promise = ws_stream_pending_abort_promise(stream_obj);
357 js_set_slot(stream_obj, SLOT_WS_READY, js_mkundef());
358
359 if (stream->pending_abort_was_already_erroring) {
360 js_reject_promise(js, abort_promise, stored_error);
361 ws_reject_close_and_closed(js, stream_obj);
362 return;
363 }
364
365 ant_value_t result = js_mkundef();
366 if (is_callable(saved_abort_fn)) {
367 ant_value_t abort_args[1] = { stored_error };
368 result = sv_vm_call(js->vm, js, saved_abort_fn, saved_sink, abort_args, 1, NULL, false);
369 }
370
371 if (is_err(result)) {
372 ant_value_t thrown = js->thrown_value;
373 js_reject_promise(js, abort_promise, is_object_type(thrown) ? thrown : result);
374 ws_reject_close_and_closed(js, stream_obj);
375 } else {
376 ant_value_t wrapper = js_mkobj(js);
377 js_set_slot(wrapper, SLOT_DATA, abort_promise);
378 js_set_slot(wrapper, SLOT_ENTRIES, stream_obj);
379 ant_value_t res_fn = js_heavy_mkfun(js, ws_finish_erroring_abort_resolve, wrapper);
380 ant_value_t rej_fn = js_heavy_mkfun(js, ws_finish_erroring_abort_reject, wrapper);
381 ws_chain_promise(js, result, res_fn, rej_fn);
382 }
383
384 return;
385 }
386
387 ws_reject_close_and_closed(js, stream_obj);
388}
389
390static void writable_stream_deal_with_rejection(ant_t *js, ant_value_t stream_obj, ant_value_t error) {
391 ws_stream_t *stream = ws_get_stream(stream_obj);
392 if (!stream) return;
393 if (stream->state == WS_STATE_WRITABLE) {
394 writable_stream_start_erroring(js, stream_obj, error);
395 return;
396 }
397 writable_stream_finish_erroring(js, stream_obj);
398}
399
400static void writable_stream_update_backpressure(ant_t *js, ant_value_t stream_obj, bool backpressure) {
401 ws_stream_t *stream = ws_get_stream(stream_obj);
402 if (!stream) return;
403 ant_value_t writer_obj = ws_stream_writer(stream_obj);
404
405 if (ws_is_writer(writer_obj) && stream->backpressure != backpressure) {
406 if (backpressure) {
407 ant_value_t ready = js_mkpromise(js);
408 promise_mark_handled(ready);
409 js_set_slot_wb(js, writer_obj, SLOT_WS_READY, ready);
410 } else {
411 ant_value_t ready = ws_writer_ready(writer_obj);
412 if (!is_undefined(ready)) js_resolve_promise(js, ready, js_mkundef());
413 }}
414
415 stream->backpressure = backpressure;
416}
417
418static void writable_stream_finish_in_flight_write(ant_t *js, ant_value_t stream_obj) {
419 ant_value_t p = ws_stream_in_flight_write(stream_obj);
420 if (!is_undefined(p)) js_resolve_promise(js, p, js_mkundef());
421 js_set_slot(stream_obj, SLOT_DEFAULT, js_mkundef());
422}
423
424static void writable_stream_finish_in_flight_write_with_error(ant_t *js, ant_value_t stream_obj, ant_value_t error) {
425 ant_value_t p = ws_stream_in_flight_write(stream_obj);
426 if (!is_undefined(p)) js_reject_promise(js, p, error);
427 js_set_slot(stream_obj, SLOT_DEFAULT, js_mkundef());
428 writable_stream_deal_with_rejection(js, stream_obj, error);
429}
430
431static void writable_stream_finish_in_flight_close(ant_t *js, ant_value_t stream_obj) {
432 ant_value_t p = ws_stream_in_flight_close(stream_obj);
433 if (!is_undefined(p)) js_resolve_promise(js, p, js_mkundef());
434 js_set_slot(stream_obj, SLOT_WS_ABORT, js_mkundef());
435
436 ws_stream_t *stream = ws_get_stream(stream_obj);
437 if (!stream) return;
438
439 if (stream->state == WS_STATE_ERRORING) {
440 js_set_slot(stream_obj, SLOT_AUX, js_mkundef());
441 if (stream->has_pending_abort) {
442 ant_value_t ap = ws_stream_pending_abort_promise(stream_obj);
443 js_resolve_promise(js, ap, js_mkundef());
444 stream->has_pending_abort = false;
445 js_set_slot(stream_obj, SLOT_WS_READY, js_mkundef());
446 }
447 }
448 stream->state = WS_STATE_CLOSED;
449
450 ant_value_t writer_obj = ws_stream_writer(stream_obj);
451 if (ws_is_writer(writer_obj)) {
452 ant_value_t closed = ws_writer_closed(writer_obj);
453 if (!is_undefined(closed)) js_resolve_promise(js, closed, js_mkundef());
454 }
455}
456
457static void writable_stream_finish_in_flight_close_with_error(ant_t *js, ant_value_t stream_obj, ant_value_t error) {
458 ant_value_t p = ws_stream_in_flight_close(stream_obj);
459 if (!is_undefined(p)) js_reject_promise(js, p, error);
460 js_set_slot(stream_obj, SLOT_WS_ABORT, js_mkundef());
461 writable_stream_deal_with_rejection(js, stream_obj, error);
462}
463
464static void writable_stream_mark_first_write_in_flight(ant_t *js, ant_value_t stream_obj) {
465 ant_value_t wr = ws_write_reqs_shift(js, stream_obj);
466 js_set_slot_wb(js, stream_obj, SLOT_DEFAULT, wr);
467}
468
469static void writable_stream_mark_close_in_flight(ant_t *js, ant_value_t stream_obj) {
470 ant_value_t cr = ws_stream_close_request(stream_obj);
471 js_set_slot_wb(js, stream_obj, SLOT_WS_ABORT, cr);
472 js_set_slot(stream_obj, SLOT_WS_CLOSE, js_mkundef());
473}
474
475static ant_value_t ws_process_write_resolve(ant_t *js, ant_value_t *args, int nargs) {
476 ant_value_t ctrl_obj = js_get_slot(js->current_func, SLOT_DATA);
477 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
478 if (!ctrl) return js_mkundef();
479
480 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
481 writable_stream_finish_in_flight_write(js, stream_obj);
482
483 ws_stream_t *stream = ws_get_stream(stream_obj);
484 if (!stream) return js_mkundef();
485
486 ws_ctrl_queue_shift(js, ctrl_obj);
487 if (ctrl->queue_sizes_len > 0) {
488 double sz = ctrl->queue_sizes[0];
489 ctrl->queue_sizes_len--;
490 memmove(ctrl->queue_sizes, ctrl->queue_sizes + 1, ctrl->queue_sizes_len * sizeof(double));
491 ctrl->queue_total_size -= sz;
492 if (ctrl->queue_total_size < 0) ctrl->queue_total_size = 0;
493 }
494
495 if (!writable_stream_close_queued_or_in_flight(stream_obj) && stream->state == WS_STATE_WRITABLE) {
496 bool bp = ws_default_controller_get_backpressure(ctrl);
497 writable_stream_update_backpressure(js, stream_obj, bp);
498 }
499
500 ws_default_controller_advance_queue_if_needed(js, ctrl_obj);
501 return js_mkundef();
502}
503
504static ant_value_t ws_process_write_reject(ant_t *js, ant_value_t *args, int nargs) {
505 ant_value_t ctrl_obj = js_get_slot(js->current_func, SLOT_DATA);
506 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
507 ws_stream_t *stream = ws_get_stream(stream_obj);
508
509 ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
510
511 if (stream && stream->state == WS_STATE_WRITABLE)
512 ws_default_controller_clear_algorithms(ctrl_obj);
513
514 writable_stream_finish_in_flight_write_with_error(js, stream_obj, reason);
515 return js_mkundef();
516}
517
518static void ws_default_controller_process_write(ant_t *js, ant_value_t ctrl_obj, ant_value_t chunk) {
519 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
520 writable_stream_mark_first_write_in_flight(js, stream_obj);
521
522 ant_value_t write_fn = ws_ctrl_write_fn(ctrl_obj);
523 ant_value_t sink = ws_ctrl_sink(ctrl_obj);
524 ant_value_t result = js_mkundef();
525 if (is_callable(write_fn)) {
526 ant_value_t write_args[2] = { chunk, ctrl_obj };
527 result = sv_vm_call(js->vm, js, write_fn, sink, write_args, 2, NULL, false);
528 }
529
530 if (is_err(result)) {
531 ws_stream_t *stream = ws_get_stream(stream_obj);
532 ant_value_t thrown = js->thrown_value;
533 ant_value_t err = is_object_type(thrown) ? thrown : result;
534 if (stream && stream->state == WS_STATE_WRITABLE)
535 ws_default_controller_clear_algorithms(ctrl_obj);
536 writable_stream_finish_in_flight_write_with_error(js, stream_obj, err);
537 } else {
538 ant_value_t res_fn = js_heavy_mkfun(js, ws_process_write_resolve, ctrl_obj);
539 ant_value_t rej_fn = js_heavy_mkfun(js, ws_process_write_reject, ctrl_obj);
540 ws_chain_promise(js, result, res_fn, rej_fn);
541 }
542}
543
544static ant_value_t ws_process_close_resolve(ant_t *js, ant_value_t *args, int nargs) {
545 ant_value_t stream_obj = js_get_slot(js->current_func, SLOT_DATA);
546 writable_stream_finish_in_flight_close(js, stream_obj);
547 return js_mkundef();
548}
549
550static ant_value_t ws_process_close_reject(ant_t *js, ant_value_t *args, int nargs) {
551 ant_value_t stream_obj = js_get_slot(js->current_func, SLOT_DATA);
552 ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
553 writable_stream_finish_in_flight_close_with_error(js, stream_obj, reason);
554 return js_mkundef();
555}
556
557static void ws_default_controller_process_close(ant_t *js, ant_value_t ctrl_obj) {
558 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
559 writable_stream_mark_close_in_flight(js, stream_obj);
560
561 ws_ctrl_queue_shift(js, ctrl_obj);
562 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
563 if (ctrl && ctrl->queue_sizes_len > 0) {
564 ctrl->queue_sizes_len--;
565 memmove(ctrl->queue_sizes, ctrl->queue_sizes + 1, ctrl->queue_sizes_len * sizeof(double));
566 }
567
568 ant_value_t close_fn = ws_ctrl_close_fn(ctrl_obj);
569 ant_value_t sink = ws_ctrl_sink(ctrl_obj);
570 ws_default_controller_clear_algorithms(ctrl_obj);
571
572 ant_value_t result = js_mkundef();
573 if (is_callable(close_fn))
574 result = sv_vm_call(js->vm, js, close_fn, sink, NULL, 0, NULL, false);
575
576 if (is_err(result)) {
577 ant_value_t thrown = js->thrown_value;
578 ant_value_t err = is_object_type(thrown) ? thrown : result;
579 writable_stream_finish_in_flight_close_with_error(js, stream_obj, err);
580 } else {
581 ant_value_t res_fn = js_heavy_mkfun(js, ws_process_close_resolve, stream_obj);
582 ant_value_t rej_fn = js_heavy_mkfun(js, ws_process_close_reject, stream_obj);
583 ws_chain_promise(js, result, res_fn, rej_fn);
584 }
585}
586
587void ws_default_controller_advance_queue_if_needed(ant_t *js, ant_value_t ctrl_obj) {
588 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
589 if (!ctrl || !ctrl->started) return;
590
591 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
592 ws_stream_t *stream = ws_get_stream(stream_obj);
593 if (!stream) return;
594
595 if (!is_undefined(ws_stream_in_flight_write(stream_obj))) return;
596
597 if (stream->state == WS_STATE_ERRORING) {
598 writable_stream_finish_erroring(js, stream_obj);
599 return;
600 }
601
602 if (ws_ctrl_queue_empty(ctrl_obj)) return;
603 ant_value_t value = ws_ctrl_queue_peek(js, ctrl_obj);
604
605 if (value == g_close_sentinel) ws_default_controller_process_close(js, ctrl_obj);
606 else ws_default_controller_process_write(js, ctrl_obj, value);
607}
608
609static void ws_default_controller_close(ant_t *js, ant_value_t ctrl_obj) {
610 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
611 if (!ctrl) return;
612
613 ws_ctrl_queue_push(js, ctrl_obj, g_close_sentinel);
614 if (ctrl->queue_sizes_len >= ctrl->queue_sizes_cap) {
615 uint32_t new_cap = ctrl->queue_sizes_cap ? ctrl->queue_sizes_cap * 2 : 8;
616 double *ns = realloc(ctrl->queue_sizes, new_cap * sizeof(double));
617 if (ns) { ctrl->queue_sizes = ns; ctrl->queue_sizes_cap = new_cap; }
618 }
619 if (ctrl->queue_sizes_len < ctrl->queue_sizes_cap)
620 ctrl->queue_sizes[ctrl->queue_sizes_len++] = 0;
621
622 ws_default_controller_advance_queue_if_needed(js, ctrl_obj);
623}
624
625static void ws_default_controller_write(ant_t *js, ant_value_t ctrl_obj, ant_value_t chunk, double chunk_size) {
626 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
627 if (!ctrl) return;
628
629 ws_ctrl_queue_push(js, ctrl_obj, chunk);
630 if (ctrl->queue_sizes_len >= ctrl->queue_sizes_cap) {
631 uint32_t new_cap = ctrl->queue_sizes_cap ? ctrl->queue_sizes_cap * 2 : 8;
632 double *ns = realloc(ctrl->queue_sizes, new_cap * sizeof(double));
633 if (ns) { ctrl->queue_sizes = ns; ctrl->queue_sizes_cap = new_cap; }
634 }
635 if (ctrl->queue_sizes_len < ctrl->queue_sizes_cap)
636 ctrl->queue_sizes[ctrl->queue_sizes_len++] = chunk_size;
637 ctrl->queue_total_size += chunk_size;
638
639 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
640 if (!writable_stream_close_queued_or_in_flight(stream_obj)) {
641 ws_stream_t *stream = ws_get_stream(stream_obj);
642 if (stream && stream->state == WS_STATE_WRITABLE) {
643 bool bp = ws_default_controller_get_backpressure(ctrl);
644 writable_stream_update_backpressure(js, stream_obj, bp);
645 }
646 }
647
648 ws_default_controller_advance_queue_if_needed(js, ctrl_obj);
649}
650
651void ws_default_controller_error(ant_t *js, ant_value_t ctrl_obj, ant_value_t error) {
652 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
653 ws_stream_t *stream = ws_get_stream(stream_obj);
654 if (!stream || stream->state != WS_STATE_WRITABLE) return;
655 ws_default_controller_clear_algorithms(ctrl_obj);
656 writable_stream_start_erroring(js, stream_obj, error);
657}
658
659ant_value_t writable_stream_close(ant_t *js, ant_value_t stream_obj) {
660 ws_stream_t *stream = ws_get_stream(stream_obj);
661 if (!stream) return js_mkundef();
662
663 if (stream->state == WS_STATE_CLOSED || stream->state == WS_STATE_ERRORED) {
664 ant_value_t p = js_mkpromise(js);
665 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot close a stream that is already closed or errored");
666 js_reject_promise(js, p, js->thrown_value);
667 return p;
668 }
669 if (writable_stream_close_queued_or_in_flight(stream_obj)) {
670 ant_value_t p = js_mkpromise(js);
671 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot close an already-closing stream");
672 js_reject_promise(js, p, js->thrown_value);
673 return p;
674 }
675
676 ant_value_t promise = js_mkpromise(js);
677 js_set_slot_wb(js, stream_obj, SLOT_WS_CLOSE, promise);
678
679 ant_value_t writer_obj = ws_stream_writer(stream_obj);
680 if (ws_is_writer(writer_obj) && stream->backpressure && stream->state == WS_STATE_WRITABLE) {
681 ant_value_t ready = ws_writer_ready(writer_obj);
682 if (!is_undefined(ready)) js_resolve_promise(js, ready, js_mkundef());
683 }
684
685 ant_value_t ctrl_obj = ws_stream_controller(stream_obj);
686 ws_default_controller_close(js, ctrl_obj);
687
688 return promise;
689}
690
691static ant_value_t ws_abort_resolve(ant_t *js, ant_value_t *args, int nargs) {
692 ant_value_t wrapper = js_get_slot(js->current_func, SLOT_DATA);
693 ant_value_t p = js_get_slot(wrapper, SLOT_DATA);
694 ant_value_t stream_obj = js_get_slot(wrapper, SLOT_ENTRIES);
695 js_resolve_promise(js, p, js_mkundef());
696
697 ws_stream_t *stream = ws_get_stream(stream_obj);
698 if (stream) stream->has_pending_abort = false;
699 js_set_slot(stream_obj, SLOT_WS_READY, js_mkundef());
700
701 ant_value_t stored_error = ws_stream_stored_error(stream_obj);
702 ant_value_t cr = ws_stream_close_request(stream_obj);
703
704 if (!is_undefined(cr)) {
705 js_reject_promise(js, cr, stored_error);
706 js_set_slot(stream_obj, SLOT_WS_CLOSE, js_mkundef());
707 }
708
709 ant_value_t writer_obj = ws_stream_writer(stream_obj);
710 if (ws_is_writer(writer_obj)) ws_writer_reject_closed_promise(js, writer_obj, stored_error);
711
712 return js_mkundef();
713}
714
715static ant_value_t ws_abort_reject(ant_t *js, ant_value_t *args, int nargs) {
716 ant_value_t wrapper = js_get_slot(js->current_func, SLOT_DATA);
717 ant_value_t p = js_get_slot(wrapper, SLOT_DATA);
718 ant_value_t stream_obj = js_get_slot(wrapper, SLOT_ENTRIES);
719 ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
720 js_reject_promise(js, p, reason);
721
722 ws_stream_t *stream = ws_get_stream(stream_obj);
723 if (stream) stream->has_pending_abort = false;
724 js_set_slot(stream_obj, SLOT_WS_READY, js_mkundef());
725
726 ant_value_t stored_error = ws_stream_stored_error(stream_obj);
727 ant_value_t cr = ws_stream_close_request(stream_obj);
728
729 if (!is_undefined(cr)) {
730 js_reject_promise(js, cr, stored_error);
731 js_set_slot(stream_obj, SLOT_WS_CLOSE, js_mkundef());
732 }
733
734 ant_value_t writer_obj = ws_stream_writer(stream_obj);
735 if (ws_is_writer(writer_obj))
736 ws_writer_reject_closed_promise(js, writer_obj, stored_error);
737
738 return js_mkundef();
739}
740
741ant_value_t writable_stream_abort(ant_t *js, ant_value_t stream_obj, ant_value_t reason) {
742 ws_stream_t *stream = ws_get_stream(stream_obj);
743 if (!stream) return js_mkundef();
744
745 if (stream->state == WS_STATE_CLOSED || stream->state == WS_STATE_ERRORED) {
746 ant_value_t p = js_mkpromise(js);
747 js_resolve_promise(js, p, js_mkundef());
748 return p;
749 }
750
751 if (stream->has_pending_abort) {
752 return ws_stream_pending_abort_promise(stream_obj);
753 }
754
755 bool was_already_erroring = (stream->state == WS_STATE_ERRORING);
756
757 ant_value_t promise = js_mkpromise(js);
758 stream->has_pending_abort = true;
759 stream->pending_abort_was_already_erroring = was_already_erroring;
760 js_set_slot_wb(js, stream_obj, SLOT_WS_READY, promise);
761
762 if (!was_already_erroring)
763 writable_stream_start_erroring(js, stream_obj, reason);
764
765 return promise;
766}
767
768ant_value_t ws_writer_write(ant_t *js, ant_value_t writer_obj, ant_value_t chunk) {
769 ant_value_t stream_obj = ws_writer_stream(writer_obj);
770 if (!ws_is_stream(stream_obj)) {
771 ant_value_t p = js_mkpromise(js);
772 js_mkerr_typed(js, JS_ERR_TYPE, "Writer has no stream");
773 js_reject_promise(js, p, js->thrown_value);
774 return p;
775 }
776
777 ws_stream_t *stream = ws_get_stream(stream_obj);
778 if (!stream) {
779 ant_value_t p = js_mkpromise(js);
780 js_mkerr_typed(js, JS_ERR_TYPE, "Invalid WritableStream");
781 js_reject_promise(js, p, js->thrown_value);
782 return p;
783 }
784
785 if (stream->state == WS_STATE_CLOSED) {
786 ant_value_t p = js_mkpromise(js);
787 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot write to a closed WritableStream");
788 js_reject_promise(js, p, js->thrown_value);
789 return p;
790 }
791
792 if (stream->state == WS_STATE_ERRORED) {
793 ant_value_t p = js_mkpromise(js);
794 js_reject_promise(js, p, ws_stream_stored_error(stream_obj));
795 return p;
796 }
797
798 if (writable_stream_close_queued_or_in_flight(stream_obj)) {
799 ant_value_t p = js_mkpromise(js);
800 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot write to a closing WritableStream");
801 js_reject_promise(js, p, js->thrown_value);
802 return p;
803 }
804
805 if (stream->state == WS_STATE_ERRORING) {
806 ant_value_t p = js_mkpromise(js);
807 js_reject_promise(js, p, ws_stream_stored_error(stream_obj));
808 return p;
809 }
810
811 ant_value_t ctrl_obj = ws_stream_controller(stream_obj);
812 double chunk_size = 1;
813 ant_value_t size_fn = ws_ctrl_size_fn(ctrl_obj);
814
815 if (is_callable(size_fn)) {
816 ant_value_t size_args[1] = { chunk };
817 ant_value_t size_result = sv_vm_call(js->vm, js, size_fn, js_mkundef(), size_args, 1, NULL, false);
818 if (is_err(size_result)) {
819 ant_value_t thrown = js->thrown_value;
820 ant_value_t err = is_object_type(thrown) ? thrown : size_result;
821 ws_default_controller_error(js, ctrl_obj, err);
822 ant_value_t p = js_mkpromise(js);
823 js_reject_promise(js, p, err);
824 return p;
825 }
826 if (vtype(size_result) == T_NUM) chunk_size = js_getnum(size_result);
827 else chunk_size = js_to_number(js, size_result);
828 }
829
830 if (chunk_size < 0 || chunk_size != chunk_size || chunk_size == (double)INFINITY) {
831 js_mkerr_typed(js, JS_ERR_RANGE,
832 "The return value of a queuing strategy's size function must be a finite, non-NaN, non-negative number");
833 ant_value_t err = is_object_type(js->thrown_value) ? js->thrown_value : js_mkundef();
834 ws_default_controller_error(js, ctrl_obj, err);
835 ant_value_t p = js_mkpromise(js);
836 js_reject_promise(js, p, err);
837 return p;
838 }
839
840 ant_value_t p = js_mkpromise(js);
841 ws_write_reqs_push(js, stream_obj, p);
842 ws_default_controller_write(js, ctrl_obj, chunk, chunk_size);
843
844 return p;
845}
846
847static ant_value_t ws_start_resolve_handler(ant_t *js, ant_value_t *args, int nargs) {
848 ant_value_t ctrl_obj = js_get_slot(js->current_func, SLOT_DATA);
849 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
850 if (!ctrl) return js_mkundef();
851 ctrl->started = true;
852 ws_default_controller_advance_queue_if_needed(js, ctrl_obj);
853 return js_mkundef();
854}
855
856static ant_value_t ws_start_reject_handler(ant_t *js, ant_value_t *args, int nargs) {
857 ant_value_t ctrl_obj = js_get_slot(js->current_func, SLOT_DATA);
858 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
859 if (!ctrl) return js_mkundef();
860 ctrl->started = true;
861 ant_value_t stream_obj = ws_ctrl_stream(ctrl_obj);
862 writable_stream_deal_with_rejection(js, stream_obj, nargs > 0 ? args[0] : js_mkundef());
863 return js_mkundef();
864}
865
866static ant_value_t js_ws_controller_get_signal(ant_t *js, ant_value_t *args, int nargs) {
867 ant_value_t signal_ctrl = ws_ctrl_signal(js->this_val);
868 if (!is_object_type(signal_ctrl)) return js_mkundef();
869 return js_get(js, signal_ctrl, "signal");
870}
871
872static ant_value_t js_ws_controller_error(ant_t *js, ant_value_t *args, int nargs) {
873 ws_controller_t *ctrl = ws_get_controller(js->this_val);
874 if (!ctrl) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid WritableStreamDefaultController");
875 ant_value_t stream_obj = ws_ctrl_stream(js->this_val);
876 ws_stream_t *stream = ws_get_stream(stream_obj);
877 if (!stream || stream->state != WS_STATE_WRITABLE) return js_mkundef();
878 ant_value_t e = (nargs > 0) ? args[0] : js_mkundef();
879 ws_default_controller_error(js, js->this_val, e);
880 return js_mkundef();
881}
882
883static ant_value_t js_ws_writer_get_closed(ant_t *js, ant_value_t *args, int nargs) {
884 return ws_writer_closed(js->this_val);
885}
886
887static ant_value_t js_ws_writer_get_ready(ant_t *js, ant_value_t *args, int nargs) {
888 return ws_writer_ready(js->this_val);
889}
890
891static ant_value_t js_ws_writer_get_desired_size(ant_t *js, ant_value_t *args, int nargs) {
892 ant_value_t stream_obj = ws_writer_stream(js->this_val);
893 if (!ws_is_stream(stream_obj))
894 return js_mkerr_typed(js, JS_ERR_TYPE, "Writer has no stream");
895 ws_stream_t *stream = ws_get_stream(stream_obj);
896 if (!stream) return js_mknull();
897 if (stream->state == WS_STATE_ERRORED || stream->state == WS_STATE_ERRORING) return js_mknull();
898 if (stream->state == WS_STATE_CLOSED) return js_mknum(0);
899 ant_value_t ctrl_obj = ws_stream_controller(stream_obj);
900 ws_controller_t *ctrl = ws_get_controller(ctrl_obj);
901 if (!ctrl) return js_mknull();
902 return js_mknum(ws_default_controller_get_desired_size(ctrl));
903}
904
905static ant_value_t js_ws_writer_abort(ant_t *js, ant_value_t *args, int nargs) {
906 ant_value_t stream_obj = ws_writer_stream(js->this_val);
907 if (!ws_is_stream(stream_obj)) {
908 ant_value_t p = js_mkpromise(js);
909 js_mkerr_typed(js, JS_ERR_TYPE, "Writer has no stream");
910 js_reject_promise(js, p, js->thrown_value);
911 return p;
912 }
913 ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
914 return writable_stream_abort(js, stream_obj, reason);
915}
916
917static ant_value_t js_ws_writer_close(ant_t *js, ant_value_t *args, int nargs) {
918 ant_value_t stream_obj = ws_writer_stream(js->this_val);
919 if (!ws_is_stream(stream_obj)) {
920 ant_value_t p = js_mkpromise(js);
921 js_mkerr_typed(js, JS_ERR_TYPE, "Writer has no stream");
922 js_reject_promise(js, p, js->thrown_value);
923 return p;
924 }
925 if (writable_stream_close_queued_or_in_flight(stream_obj)) {
926 ant_value_t p = js_mkpromise(js);
927 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot close an already-closing stream");
928 js_reject_promise(js, p, js->thrown_value);
929 return p;
930 }
931 return writable_stream_close(js, stream_obj);
932}
933
934static ant_value_t js_ws_writer_release_lock(ant_t *js, ant_value_t *args, int nargs) {
935 ant_value_t stream_obj = ws_writer_stream(js->this_val);
936 if (!ws_is_stream(stream_obj)) return js_mkundef();
937 ant_value_t release_err = js_make_error_silent(js, JS_ERR_TYPE, "Writer was released");
938
939 ws_writer_reject_ready_promise(js, js->this_val, release_err);
940 ws_writer_reject_closed_promise(js, js->this_val, release_err);
941
942 promise_mark_handled(ws_writer_ready(js->this_val));
943 promise_mark_handled(ws_writer_closed(js->this_val));
944
945 ws_writer_replace_ready_promise_rejected(js, js->this_val, release_err);
946 ws_writer_replace_closed_promise_rejected(js, js->this_val, release_err);
947
948 js_set_slot(stream_obj, SLOT_CTOR, js_mkundef());
949 js_set_slot(js->this_val, SLOT_ENTRIES, js_mkundef());
950
951 return js_mkundef();
952}
953
954static ant_value_t js_ws_writer_write(ant_t *js, ant_value_t *args, int nargs) {
955 ant_value_t stream_obj = ws_writer_stream(js->this_val);
956 if (!ws_is_stream(stream_obj)) {
957 ant_value_t p = js_mkpromise(js);
958 js_mkerr_typed(js, JS_ERR_TYPE, "Writer has no stream");
959 js_reject_promise(js, p, js->thrown_value);
960 return p;
961 }
962 ant_value_t chunk = (nargs > 0) ? args[0] : js_mkundef();
963 return ws_writer_write(js, js->this_val, chunk);
964}
965
966ant_value_t js_ws_writer_ctor(ant_t *js, ant_value_t *args, int nargs) {
967 if (vtype(js->new_target) == T_UNDEF)
968 return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStreamDefaultWriter constructor requires 'new'");
969 if (nargs < 1)
970 return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStreamDefaultWriter requires a stream argument");
971
972 ant_value_t stream_obj = args[0];
973 if (!ws_is_stream(stream_obj))
974 return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStreamDefaultWriter argument must be a WritableStream");
975 ws_stream_t *stream = ws_get_stream(stream_obj);
976 if (ws_is_writer(ws_stream_writer(stream_obj)))
977 return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStream is already locked to a writer");
978
979 ant_value_t obj = js_mkobj(js);
980 ant_value_t proto = js_instance_proto_from_new_target(js, g_ws_writer_proto);
981
982 if (is_object_type(proto)) js_set_proto_init(obj, proto);
983 js_set_slot(obj, SLOT_BRAND, js_mknum(BRAND_WRITABLE_STREAM_WRITER));
984
985 ant_value_t closed = js_mkpromise(js);
986 ant_value_t ready = js_mkpromise(js);
987
988 promise_mark_handled(closed);
989 promise_mark_handled(ready);
990
991 js_set_slot(obj, SLOT_ENTRIES, stream_obj);
992 js_set_slot(obj, SLOT_RS_CLOSED, closed);
993 js_set_slot(obj, SLOT_WS_READY, ready);
994 js_set_slot(stream_obj, SLOT_CTOR, obj);
995
996 if (stream->state == WS_STATE_WRITABLE) {
997 if (writable_stream_close_queued_or_in_flight(stream_obj) || !stream->backpressure) js_resolve_promise(js, ready, js_mkundef());
998 } else if (stream->state == WS_STATE_ERRORING) {
999 ant_value_t stored_error = ws_stream_stored_error(stream_obj);
1000 js_reject_promise(js, ready, stored_error);
1001 } else if (stream->state == WS_STATE_CLOSED) {
1002 js_resolve_promise(js, ready, js_mkundef());
1003 js_resolve_promise(js, closed, js_mkundef());
1004 } else {
1005 ant_value_t stored_error = ws_stream_stored_error(stream_obj);
1006 js_reject_promise(js, ready, stored_error);
1007 js_reject_promise(js, closed, stored_error);
1008 }
1009
1010 return obj;
1011}
1012
1013ant_value_t ws_acquire_writer(ant_t *js, ant_value_t stream_obj) {
1014 ant_value_t writer_args[1] = { stream_obj };
1015 ant_value_t saved = js->new_target;
1016
1017 js->new_target = g_ws_writer_proto;
1018 ant_value_t writer = js_ws_writer_ctor(js, writer_args, 1);
1019 js->new_target = saved;
1020
1021 return writer;
1022}
1023
1024static ant_value_t js_ws_get_locked(ant_t *js, ant_value_t *args, int nargs) {
1025 ws_stream_t *stream = ws_get_stream(js->this_val);
1026 if (!stream) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid WritableStream");
1027 return js_bool(ws_is_writer(ws_stream_writer(js->this_val)));
1028}
1029
1030static ant_value_t js_ws_abort(ant_t *js, ant_value_t *args, int nargs) {
1031 ws_stream_t *stream = ws_get_stream(js->this_val);
1032 if (!stream) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid WritableStream");
1033 if (ws_is_writer(ws_stream_writer(js->this_val))) {
1034 ant_value_t p = js_mkpromise(js);
1035 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot abort a locked WritableStream");
1036 js_reject_promise(js, p, js->thrown_value);
1037 return p;
1038 }
1039 ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
1040 return writable_stream_abort(js, js->this_val, reason);
1041}
1042
1043static ant_value_t js_ws_close(ant_t *js, ant_value_t *args, int nargs) {
1044 ws_stream_t *stream = ws_get_stream(js->this_val);
1045 if (!stream) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid WritableStream");
1046 if (ws_is_writer(ws_stream_writer(js->this_val))) {
1047 ant_value_t p = js_mkpromise(js);
1048 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot close a locked WritableStream");
1049 js_reject_promise(js, p, js->thrown_value);
1050 return p;
1051 }
1052 if (writable_stream_close_queued_or_in_flight(js->this_val)) {
1053 ant_value_t p = js_mkpromise(js);
1054 js_mkerr_typed(js, JS_ERR_TYPE, "Cannot close an already-closing stream");
1055 js_reject_promise(js, p, js->thrown_value);
1056 return p;
1057 }
1058 return writable_stream_close(js, js->this_val);
1059}
1060
1061static ant_value_t js_ws_get_writer(ant_t *js, ant_value_t *args, int nargs) {
1062 ws_stream_t *stream = ws_get_stream(js->this_val);
1063 if (!stream) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid WritableStream");
1064
1065 ant_value_t writer_args[1] = { js->this_val };
1066 ant_value_t saved_new_target = js->new_target;
1067 js->new_target = g_ws_writer_proto;
1068 ant_value_t writer = js_ws_writer_ctor(js, writer_args, 1);
1069 js->new_target = saved_new_target;
1070 return writer;
1071}
1072
1073static ant_value_t setup_ws_default_controller(
1074 ant_t *js, ant_value_t stream_obj, ant_value_t underlying_sink,
1075 ant_value_t write_fn, ant_value_t close_fn, ant_value_t abort_fn,
1076 ant_value_t size_fn, double hwm
1077) {
1078 ws_controller_t *ctrl = calloc(1, sizeof(ws_controller_t));
1079 if (!ctrl) return js_mkerr(js, "out of memory");
1080 ctrl->strategy_hwm = hwm;
1081
1082 ant_value_t ctrl_obj = js_mkobj(js);
1083 js_set_proto_init(ctrl_obj, g_ws_controller_proto);
1084 js_set_slot(ctrl_obj, SLOT_BRAND, js_mknum(BRAND_WRITABLE_STREAM_CONTROLLER));
1085 js_set_slot(ctrl_obj, SLOT_DATA, ANT_PTR(ctrl));
1086 js_set_slot(ctrl_obj, SLOT_ENTRIES, stream_obj);
1087 js_set_slot(ctrl_obj, SLOT_WS_WRITE, write_fn);
1088 js_set_slot(ctrl_obj, SLOT_WS_CLOSE, close_fn);
1089 js_set_slot(ctrl_obj, SLOT_WS_ABORT, abort_fn);
1090 js_set_slot(ctrl_obj, SLOT_RS_SIZE, size_fn);
1091 js_set_slot(ctrl_obj, SLOT_CTOR, underlying_sink);
1092 js_set_slot(ctrl_obj, SLOT_AUX, js_mkarr(js));
1093 js_set_finalizer(ctrl_obj, ws_controller_finalize);
1094
1095 ant_value_t ac_ctor = js_get(js, js_glob(js), "AbortController");
1096 ant_value_t ac = js_mkundef();
1097 if (is_callable(ac_ctor)) {
1098 ant_value_t ac_proto = js_get(js, ac_ctor, "prototype");
1099 ac = js_mkobj(js);
1100 if (is_object_type(ac_proto)) js_set_proto_init(ac, ac_proto);
1101 ant_value_t saved = js->new_target;
1102 js->new_target = ac_ctor;
1103 ant_value_t result = sv_vm_call(js->vm, js, ac_ctor, ac, NULL, 0, NULL, false);
1104 js->new_target = saved;
1105 if (is_err(result)) ac = js_mkundef();
1106 }
1107
1108 js_set_slot(ctrl_obj, SLOT_WS_SIGNAL, ac);
1109 js_set_slot(stream_obj, SLOT_ENTRIES, ctrl_obj);
1110
1111 bool backpressure = ws_default_controller_get_backpressure(ctrl);
1112 writable_stream_update_backpressure(js, stream_obj, backpressure);
1113
1114 return ctrl_obj;
1115}
1116
1117static ant_value_t js_ws_ctor(ant_t *js, ant_value_t *args, int nargs) {
1118 if (vtype(js->new_target) == T_UNDEF)
1119 return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStream constructor requires 'new'");
1120
1121 ant_value_t underlying_sink = js_mkundef();
1122 if (nargs > 0 && !is_undefined(args[0])) {
1123 if (is_null(args[0]))
1124 return js_mkerr_typed(js, JS_ERR_TYPE, "The underlying sink cannot be null");
1125 underlying_sink = args[0];
1126 }
1127
1128 if (is_object_type(underlying_sink)) {
1129 ant_value_t type_val = js_get(js, underlying_sink, "type");
1130 if (!is_undefined(type_val))
1131 return js_mkerr_typed(js, JS_ERR_RANGE, "Invalid type is specified");
1132 }
1133
1134 ant_value_t strategy = js_mkundef();
1135 if (nargs > 1 && !is_undefined(args[1]) && !is_null(args[1]))
1136 strategy = args[1];
1137
1138 double hwm = 1;
1139 if (is_object_type(strategy)) {
1140 ant_value_t hwm_val = js_get(js, strategy, "highWaterMark");
1141 if (is_err(hwm_val)) return hwm_val;
1142 if (!is_undefined(hwm_val)) {
1143 hwm = js_to_number(js, hwm_val);
1144 if (hwm != hwm || hwm < 0) return js_mkerr_typed(js, JS_ERR_RANGE, "Invalid highWaterMark");
1145 }
1146 }
1147
1148 ant_value_t size_fn = js_mkundef();
1149 if (is_object_type(strategy)) {
1150 ant_value_t s = js_get(js, strategy, "size");
1151 if (is_err(s)) return s;
1152 if (!is_undefined(s)) {
1153 if (!is_callable(s)) return js_mkerr_typed(js, JS_ERR_TYPE, "size must be a function");
1154 size_fn = s;
1155 }
1156 }
1157
1158 ws_stream_t *st = calloc(1, sizeof(ws_stream_t));
1159 if (!st) return js_mkerr(js, "out of memory");
1160 st->state = WS_STATE_WRITABLE;
1161
1162 ant_value_t obj = js_mkobj(js);
1163 ant_value_t proto = js_instance_proto_from_new_target(js, g_ws_proto);
1164
1165 if (is_object_type(proto)) js_set_proto_init(obj, proto);
1166 js_set_slot(obj, SLOT_BRAND, js_mknum(BRAND_WRITABLE_STREAM));
1167 js_set_slot(obj, SLOT_DATA, ANT_PTR(st));
1168 js_set_slot(obj, SLOT_SETTLED, js_mkarr(js));
1169 js_set_finalizer(obj, ws_stream_finalize);
1170
1171 ant_value_t write_fn = js_mkundef();
1172 ant_value_t close_fn = js_mkundef();
1173 ant_value_t abort_fn = js_mkundef();
1174 ant_value_t start_fn = js_mkundef();
1175
1176 if (is_object_type(underlying_sink)) {
1177 ant_value_t wv = js_getprop_fallback(js, underlying_sink, "write");
1178 if (is_err(wv)) return wv;
1179 if (!is_undefined(wv)) {
1180 if (!is_callable(wv)) return js_mkerr_typed(js, JS_ERR_TYPE, "write must be a function");
1181 write_fn = wv;
1182 }
1183
1184 ant_value_t cv = js_getprop_fallback(js, underlying_sink, "close");
1185 if (is_err(cv)) return cv;
1186 if (!is_undefined(cv)) {
1187 if (!is_callable(cv)) return js_mkerr_typed(js, JS_ERR_TYPE, "close must be a function");
1188 close_fn = cv;
1189 }
1190
1191 ant_value_t av = js_getprop_fallback(js, underlying_sink, "abort");
1192 if (is_err(av)) return av;
1193 if (!is_undefined(av)) {
1194 if (!is_callable(av)) return js_mkerr_typed(js, JS_ERR_TYPE, "abort must be a function");
1195 abort_fn = av;
1196 }
1197
1198 ant_value_t sv = js_getprop_fallback(js, underlying_sink, "start");
1199 if (is_err(sv)) return sv;
1200 if (!is_undefined(sv)) {
1201 if (!is_callable(sv)) return js_mkerr_typed(js, JS_ERR_TYPE, "start must be a function");
1202 start_fn = sv;
1203 }
1204 }
1205
1206 ant_value_t ctrl_obj = setup_ws_default_controller(js, obj, underlying_sink, write_fn, close_fn, abort_fn, size_fn, hwm);
1207 if (is_err(ctrl_obj)) return ctrl_obj;
1208
1209 if (is_callable(start_fn)) {
1210 ant_value_t start_args[1] = { ctrl_obj };
1211 ant_value_t start_result = sv_vm_call(js->vm, js, start_fn, underlying_sink, start_args, 1, NULL, false);
1212 if (is_err(start_result)) return start_result;
1213
1214 if (vtype(start_result) == T_PROMISE) {
1215 ant_value_t resolve_fn = js_heavy_mkfun(js, ws_start_resolve_handler, ctrl_obj);
1216 ant_value_t reject_fn = js_heavy_mkfun(js, ws_start_reject_handler, ctrl_obj);
1217 js_promise_then(js, start_result, resolve_fn, reject_fn);
1218 }
1219
1220 if (vtype(start_result) != T_PROMISE) {
1221 ant_value_t resolved = js_mkpromise(js);
1222 js_resolve_promise(js, resolved, js_mkundef());
1223 ant_value_t res_fn = js_heavy_mkfun(js, ws_start_resolve_handler, ctrl_obj);
1224 ant_value_t rej_fn = js_heavy_mkfun(js, ws_start_reject_handler, ctrl_obj);
1225 js_promise_then(js, resolved, res_fn, rej_fn);
1226 }
1227 } else {
1228 ant_value_t resolved = js_mkpromise(js);
1229 js_resolve_promise(js, resolved, js_mkundef());
1230 ant_value_t res_fn = js_heavy_mkfun(js, ws_start_resolve_handler, ctrl_obj);
1231 ant_value_t rej_fn = js_heavy_mkfun(js, ws_start_reject_handler, ctrl_obj);
1232 js_promise_then(js, resolved, res_fn, rej_fn);
1233 }
1234
1235 return obj;
1236}
1237
1238static ant_value_t js_ws_controller_ctor(ant_t *js, ant_value_t *args, int nargs) {
1239 return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStreamDefaultController cannot be constructed directly");
1240}
1241
1242void gc_mark_writable_streams(ant_t *js, void (*mark)(ant_t *, ant_value_t)) {
1243 mark(js, g_ws_proto);
1244 mark(js, g_ws_writer_proto);
1245 mark(js, g_ws_controller_proto);
1246 mark(js, g_close_sentinel);
1247}
1248
1249void init_writable_stream_module(void) {
1250 ant_t *js = rt->js;
1251 ant_value_t g = js_glob(js);
1252
1253 g_close_sentinel = js_mkobj(js);
1254 g_ws_controller_proto = js_mkobj(js);
1255
1256 js_set_getter_desc(js, g_ws_controller_proto, "signal", 6, js_mkfun(js_ws_controller_get_signal), JS_DESC_C);
1257 js_set(js, g_ws_controller_proto, "error", js_mkfun(js_ws_controller_error));
1258 js_set_descriptor(js, g_ws_controller_proto, "error", 5, JS_DESC_W | JS_DESC_C);
1259 js_set_sym(js, g_ws_controller_proto, get_toStringTag_sym(), js_mkstr(js, "WritableStreamDefaultController", 31));
1260
1261 ant_value_t ctrl_ctor = js_make_ctor(js, js_ws_controller_ctor, g_ws_controller_proto, "WritableStreamDefaultController", 31);
1262 js_set(js, g, "WritableStreamDefaultController", ctrl_ctor);
1263 js_set_descriptor(js, g, "WritableStreamDefaultController", 31, JS_DESC_W | JS_DESC_C);
1264
1265 g_ws_writer_proto = js_mkobj(js);
1266 js_set_getter_desc(js, g_ws_writer_proto, "closed", 6, js_mkfun(js_ws_writer_get_closed), JS_DESC_C);
1267 js_set_getter_desc(js, g_ws_writer_proto, "desiredSize", 11, js_mkfun(js_ws_writer_get_desired_size), JS_DESC_C);
1268 js_set_getter_desc(js, g_ws_writer_proto, "ready", 5, js_mkfun(js_ws_writer_get_ready), JS_DESC_C);
1269 js_set(js, g_ws_writer_proto, "abort", js_mkfun(js_ws_writer_abort));
1270 js_set_descriptor(js, g_ws_writer_proto, "abort", 5, JS_DESC_W | JS_DESC_C);
1271 js_set(js, g_ws_writer_proto, "close", js_mkfun(js_ws_writer_close));
1272 js_set_descriptor(js, g_ws_writer_proto, "close", 5, JS_DESC_W | JS_DESC_C);
1273 js_set(js, g_ws_writer_proto, "releaseLock", js_mkfun(js_ws_writer_release_lock));
1274 js_set_descriptor(js, g_ws_writer_proto, "releaseLock", 11, JS_DESC_W | JS_DESC_C);
1275 js_set(js, g_ws_writer_proto, "write", js_mkfun(js_ws_writer_write));
1276 js_set_descriptor(js, g_ws_writer_proto, "write", 5, JS_DESC_W | JS_DESC_C);
1277 js_set_sym(js, g_ws_writer_proto, get_toStringTag_sym(), js_mkstr(js, "WritableStreamDefaultWriter", 27));
1278
1279 ant_value_t writer_ctor = js_make_ctor(js, js_ws_writer_ctor, g_ws_writer_proto, "WritableStreamDefaultWriter", 27);
1280 js_set(js, g, "WritableStreamDefaultWriter", writer_ctor);
1281 js_set_descriptor(js, g, "WritableStreamDefaultWriter", 27, JS_DESC_W | JS_DESC_C);
1282
1283 g_ws_proto = js_mkobj(js);
1284 js_set_getter_desc(js, g_ws_proto, "locked", 6, js_mkfun(js_ws_get_locked), JS_DESC_C);
1285 js_set(js, g_ws_proto, "abort", js_mkfun(js_ws_abort));
1286 js_set_descriptor(js, g_ws_proto, "abort", 5, JS_DESC_W | JS_DESC_C);
1287 js_set(js, g_ws_proto, "close", js_mkfun(js_ws_close));
1288 js_set_descriptor(js, g_ws_proto, "close", 5, JS_DESC_W | JS_DESC_C);
1289 js_set(js, g_ws_proto, "getWriter", js_mkfun(js_ws_get_writer));
1290 js_set_descriptor(js, g_ws_proto, "getWriter", 9, JS_DESC_W | JS_DESC_C);
1291 js_set_sym(js, g_ws_proto, get_toStringTag_sym(), js_mkstr(js, "WritableStream", 14));
1292
1293 ant_value_t ws_ctor = js_make_ctor(js, js_ws_ctor, g_ws_proto, "WritableStream", 14);
1294 js_set(js, g, "WritableStream", ws_ctor);
1295 js_set_descriptor(js, g, "WritableStream", 14, JS_DESC_W | JS_DESC_C);
1296}