MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at mir/inline-method 1052 lines 36 kB view raw
1// stub: minimal node:worker_threads implementation 2// just enough for rolldown to run transforms 3 4#include <compat.h> // IWYU pragma: keep 5 6#include <uv.h> 7#include <stdio.h> 8#include <stdlib.h> 9#include <string.h> 10#include <signal.h> 11 12#ifdef _WIN32 13#include <io.h> 14#define WT_WRITE _write 15#define WT_STDOUT_FD 1 16extern char **_environ; 17#define environ _environ 18#else 19#include <unistd.h> 20#define WT_WRITE write 21#define WT_STDOUT_FD STDOUT_FILENO 22extern char **environ; 23#endif 24 25#include "ant.h" 26#include "internal.h" 27#include "runtime.h" 28#include "descriptors.h" 29#include "silver/engine.h" 30#include "modules/json.h" 31#include "modules/symbol.h" 32#include "modules/worker_threads.h" 33#include "gc/modules.h" 34 35#define WT_ENV_MODE "ANT_WORKER_THREADS_MODE" 36#define WT_ENV_DATA_JSON "ANT_WORKER_DATA_JSON" 37#define WT_ENV_STORE_JSON "ANT_WORKER_ENV_DATA_JSON" 38#define WT_MSG_PREFIX "ANT_WT_MSG:" 39 40typedef struct ant_worker_thread { 41 ant_t *js; 42 uv_process_t process; 43 uv_pipe_t stdout_pipe; 44 bool spawned; 45 bool exited; 46 bool closing; 47 bool closed; 48 bool refed; 49 int close_pending; 50 int64_t exit_status; 51 int term_signal; 52 char *line_buf; 53 size_t line_len; 54 size_t line_cap; 55 ant_value_t self_val; 56 ant_value_t terminate_val; 57 bool has_terminate_val; 58 struct ant_worker_thread *next; 59 struct ant_worker_thread *prev; 60} ant_worker_thread_t; 61 62static ant_worker_thread_t *active_workers_head = NULL; 63 64static bool wt_is_worker_mode(void) { 65 const char *mode = getenv(WT_ENV_MODE); 66 return mode && strcmp(mode, "1") == 0; 67} 68 69static ant_value_t wt_get_or_create_env_store(ant_t *js) { 70 ant_value_t store = js_get_slot(js->global, SLOT_WT_ENV_STORE); 71 if (!is_object_type(store)) { 72 store = js_mkobj(js); 73 js_set_slot(js->global, SLOT_WT_ENV_STORE, store); 74 } 75 return store; 76} 77 78static void wt_init_env_store(ant_t *js, bool is_worker) { 79 ant_value_t store = js_mkobj(js); 80 if (is_worker) { 81 const char *raw = getenv(WT_ENV_STORE_JSON); 82 if (raw && raw[0]) { 83 ant_value_t input = js_mkstr(js, raw, strlen(raw)); 84 ant_value_t parsed = json_parse_value(js, input); 85 if (is_object_type(parsed)) store = parsed; 86 } 87 } 88 js_set_slot(js->global, SLOT_WT_ENV_STORE, store); 89} 90 91static ant_worker_thread_t *wt_get_worker(ant_t *js, ant_value_t this_obj) { 92 if (!is_object_type(this_obj)) return NULL; 93 ant_value_t data = js_get_slot(this_obj, SLOT_DATA); 94 if (vtype(data) != T_NUM) return NULL; 95 return (ant_worker_thread_t *)(uintptr_t)js_getnum(data); 96} 97 98static bool wt_is_message_port(ant_t *js, ant_value_t obj) { 99 if (!is_object_type(obj)) return false; 100 ant_value_t tag = js_get_slot(obj, SLOT_WT_PORT_TAG); 101 return js_truthy(js, tag); 102} 103 104static ant_value_t wt_get_message_port_proto(ant_t *js) { 105 ant_value_t proto = js_get_slot(js->global, SLOT_WT_PORT_PROTO); 106 if (!is_object_type(proto)) proto = js_mkobj(js); 107 return proto; 108} 109 110static ant_value_t wt_make_message_port(ant_t *js) { 111 ant_value_t port = js_mkobj(js); 112 js_set_proto_init(port, wt_get_message_port_proto(js)); 113 js_set_slot(port, SLOT_WT_PORT_TAG, js_true); 114 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js)); 115 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 116 js_set_slot(port, SLOT_WT_PORT_PEER, js_mknull()); 117 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_false); 118 js_set_slot(port, SLOT_WT_PORT_STARTED, js_false); 119 js_set_slot(port, SLOT_WT_PORT_ON_MESSAGE, js_mkundef()); 120 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef()); 121 js_set(js, port, "onmessage", js_mkundef()); 122 return port; 123} 124 125static bool wt_port_is_closed(ant_t *js, ant_value_t port) { 126 return js_truthy(js, js_get_slot(port, SLOT_WT_PORT_CLOSED)); 127} 128 129static void wt_port_set_closed(ant_t *js, ant_value_t port, bool closed) { 130 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_bool(closed)); 131} 132 133static bool wt_port_queue_dequeue(ant_t *js, ant_value_t port, ant_value_t *out) { 134 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE); 135 if (vtype(queue) != T_ARR) return false; 136 137 ant_value_t head_val = js_get_slot(port, SLOT_WT_PORT_HEAD); 138 ant_offset_t head = (vtype(head_val) == T_NUM) ? (ant_offset_t)js_getnum(head_val) : 0; 139 ant_offset_t len = js_arr_len(js, queue); 140 if (len <= 0 || head >= len) { 141 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js)); 142 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 143 return false; 144 } 145 146 if (out) *out = js_arr_get(js, queue, head); 147 ant_offset_t next_head = head + 1; 148 149 if (next_head >= len) { 150 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js)); 151 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 152 return true; 153 } 154 155 if (next_head > 32 && next_head * 2 >= len) { 156 ant_value_t compact = js_mkarr(js); 157 for (ant_offset_t i = next_head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, queue, i)); 158 js_set_slot(port, SLOT_WT_PORT_QUEUE, compact); 159 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 160 return true; 161 } 162 163 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum((double)next_head)); 164 return true; 165} 166 167static void wt_port_queue_push(ant_t *js, ant_value_t port, ant_value_t value) { 168 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE); 169 if (vtype(queue) != T_ARR) { 170 queue = js_mkarr(js); 171 js_set_slot(port, SLOT_WT_PORT_QUEUE, queue); 172 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 173 } 174 js_arr_push(js, queue, value); 175} 176 177static void wt_port_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) { 178 if (!is_callable(fn)) return; 179 ant_value_t argv[1] = {arg}; 180 181 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false); 182} 183 184static bool wt_port_should_deliver(ant_t *js, ant_value_t port) { 185 if (wt_port_is_closed(js, port)) return false; 186 bool started = js_truthy(js, js_get_slot(port, SLOT_WT_PORT_STARTED)); 187 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE); 188 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE); 189 ant_value_t onmessage = js_get(js, port, "onmessage"); 190 191 bool has_event_listener = is_callable(on_fn) || is_callable(once_fn); 192 if (is_callable(onmessage)) return true; 193 return started && has_event_listener; 194} 195 196static void wt_port_drain(ant_t *js, ant_value_t port) { 197 if (!wt_is_message_port(js, port)) return; 198 199 while (wt_port_should_deliver(js, port)) { 200 ant_value_t msg = js_mkundef(); 201 if (!wt_port_queue_dequeue(js, port, &msg)) break; 202 203 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE); 204 wt_port_call_listener(js, port, on_fn, msg); 205 206 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE); 207 if (is_callable(once_fn)) { 208 wt_port_call_listener(js, port, once_fn, msg); 209 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef()); 210 } 211 212 ant_value_t onmessage = js_get(js, port, "onmessage"); 213 if (is_callable(onmessage)) { 214 ant_value_t event_obj = js_mkobj(js); 215 js_set(js, event_obj, "data", msg); 216 wt_port_call_listener(js, port, onmessage, event_obj); 217 } 218 219 if (wt_port_is_closed(js, port)) break; 220 } 221} 222 223static ant_value_t wt_make_resolved_promise(ant_t *js, ant_value_t value) { 224 ant_value_t p = js_mkpromise(js); 225 js_resolve_promise(js, p, value); 226 return p; 227} 228 229static void wt_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) { 230 if (!is_callable(fn)) return; 231 ant_value_t argv[1] = {arg}; 232 233 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false); 234} 235 236static void wt_emit(ant_worker_thread_t *wt, const char *event, ant_value_t arg) { 237 if (!wt || !wt->js) return; 238 ant_t *js = wt->js; 239 ant_value_t this_obj = wt->self_val; 240 if (!is_object_type(this_obj)) return; 241 242 internal_slot_t on_slot, once_slot; 243 if (strcmp(event, "message") == 0) { 244 on_slot = SLOT_WT_ON_MESSAGE; 245 once_slot = SLOT_WT_ONCE_MESSAGE; 246 } else if (strcmp(event, "exit") == 0) { 247 on_slot = SLOT_WT_ON_EXIT; 248 once_slot = SLOT_WT_ONCE_EXIT; 249 } else return; 250 251 ant_value_t on_fn = js_get_slot(this_obj, on_slot); 252 wt_call_listener(js, this_obj, on_fn, arg); 253 254 ant_value_t once_fn = js_get_slot(this_obj, once_slot); 255 if (is_callable(once_fn)) { 256 wt_call_listener(js, this_obj, once_fn, arg); 257 js_set_slot(this_obj, once_slot, js_mkundef()); 258 } 259} 260 261static void wt_free_env(char **env) { 262 if (!env) return; 263 for (char **p = env; *p; p++) free(*p); 264 free(env); 265} 266 267static char **wt_build_worker_env(const char *worker_data_json, const char *env_store_json) { 268 size_t count = 0; 269 if (environ) { 270 while (environ[count]) count++; 271 } 272 273 size_t extra = 2; 274 if (worker_data_json) extra++; 275 if (env_store_json) extra++; 276 char **env = (char **)calloc(count + extra, sizeof(char *)); 277 if (!env) return NULL; 278 279 size_t out = 0; 280 for (size_t i = 0; i < count; i++) { 281 env[out] = strdup(environ[i]); 282 if (!env[out]) { 283 wt_free_env(env); 284 return NULL; 285 } 286 out++; 287 } 288 289 env[out++] = strdup(WT_ENV_MODE "=1"); 290 if (!env[out - 1]) { 291 wt_free_env(env); 292 return NULL; 293 } 294 295 if (worker_data_json) { 296 size_t key_len = strlen(WT_ENV_DATA_JSON); 297 size_t val_len = strlen(worker_data_json); 298 char *entry = (char *)malloc(key_len + 1 + val_len + 1); 299 if (!entry) { 300 wt_free_env(env); 301 return NULL; 302 } 303 memcpy(entry, WT_ENV_DATA_JSON, key_len); 304 entry[key_len] = '='; 305 memcpy(entry + key_len + 1, worker_data_json, val_len); 306 entry[key_len + 1 + val_len] = '\0'; 307 env[out++] = entry; 308 } 309 310 if (env_store_json) { 311 size_t key_len = strlen(WT_ENV_STORE_JSON); 312 size_t val_len = strlen(env_store_json); 313 char *entry = (char *)malloc(key_len + 1 + val_len + 1); 314 if (!entry) { 315 wt_free_env(env); 316 return NULL; 317 } 318 memcpy(entry, WT_ENV_STORE_JSON, key_len); 319 entry[key_len] = '='; 320 memcpy(entry + key_len + 1, env_store_json, val_len); 321 entry[key_len + 1 + val_len] = '\0'; 322 env[out++] = entry; 323 } 324 325 env[out] = NULL; 326 return env; 327} 328 329static void wt_cleanup(ant_worker_thread_t *wt) { 330 if (!wt) return; 331 free(wt->line_buf); 332 free(wt); 333} 334 335static void wt_detach(ant_worker_thread_t *wt) { 336 if (!wt) return; 337 338 if (wt->prev) wt->prev->next = wt->next; 339 else active_workers_head = wt->next; 340 if (wt->next) wt->next->prev = wt->prev; 341 wt->prev = NULL; 342 wt->next = NULL; 343 344 wt->self_val = js_mkundef(); 345 wt->terminate_val = js_mkundef(); 346 wt->has_terminate_val = false; 347 free(wt->line_buf); 348 wt->line_buf = NULL; 349 wt->line_len = 0; 350 wt->line_cap = 0; 351 wt->spawned = false; 352 wt->refed = false; 353 wt->closed = true; 354 wt->js = NULL; 355} 356 357static void wt_on_handle_closed(uv_handle_t *h) { 358 ant_worker_thread_t *wt = (ant_worker_thread_t *)h->data; 359 if (!wt) return; 360 if (wt->close_pending > 0) wt->close_pending--; 361 if (wt->close_pending == 0) wt_detach(wt); 362} 363 364static void wt_finish_exit(ant_worker_thread_t *wt) { 365 if (!wt) return; 366 if (wt->closed || wt->closing) return; 367 wt->closing = true; 368 wt->spawned = false; 369 wt->refed = false; 370 wt->close_pending = 0; 371 372 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) { 373 wt->close_pending++; 374 uv_close((uv_handle_t *)&wt->stdout_pipe, wt_on_handle_closed); 375 } 376 if (!uv_is_closing((uv_handle_t *)&wt->process)) { 377 wt->close_pending++; 378 uv_close((uv_handle_t *)&wt->process, wt_on_handle_closed); 379 } 380 381 if (wt->close_pending == 0) wt_detach(wt); 382} 383 384static void wt_on_process_exit(uv_process_t *proc, int64_t exit_status, int term_signal) { 385 ant_worker_thread_t *wt = (ant_worker_thread_t *)proc->data; 386 if (!wt || !wt->js) return; 387 388 wt->exited = true; 389 wt->exit_status = exit_status; 390 wt->term_signal = term_signal; 391 392 if (wt->has_terminate_val) { 393 ant_value_t p = wt->terminate_val; 394 js_resolve_promise(wt->js, p, js_mknum((double)exit_status)); 395 wt->terminate_val = js_mkundef(); 396 wt->has_terminate_val = false; 397 } 398 399 wt_emit(wt, "exit", js_mknum((double)exit_status)); 400 wt_finish_exit(wt); 401} 402 403static void wt_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { 404 buf->base = (char *)malloc(suggested_size); 405 buf->len = buf->base ? suggested_size : 0; 406} 407 408static void wt_emit_message_from_json(ant_worker_thread_t *wt, const char *json, size_t len) { 409 if (!wt || !wt->js) return; 410 ant_t *js = wt->js; 411 412 ant_value_t s = js_mkstr(js, json, len); 413 ant_value_t msg = json_parse_value(js, s); 414 if (is_err(msg)) msg = s; 415 416 wt_emit(wt, "message", msg); 417} 418 419static void wt_process_lines(ant_worker_thread_t *wt) { 420 if (!wt || !wt->line_buf) return; 421 422 for (;;) { 423 char *nl = memchr(wt->line_buf, '\n', wt->line_len); 424 if (!nl) break; 425 426 size_t line_len = (size_t)(nl - wt->line_buf); 427 if (line_len > 0 && wt->line_buf[line_len - 1] == '\r') line_len--; 428 429 size_t prefix_len = strlen(WT_MSG_PREFIX); 430 if (line_len >= prefix_len && memcmp(wt->line_buf, WT_MSG_PREFIX, prefix_len) == 0) { 431 const char *payload = wt->line_buf + prefix_len; 432 size_t payload_len = line_len - prefix_len; 433 wt_emit_message_from_json(wt, payload, payload_len); 434 } 435 436 size_t consumed = (size_t)(nl - wt->line_buf) + 1; 437 size_t remain = wt->line_len - consumed; 438 memmove(wt->line_buf, wt->line_buf + consumed, remain); 439 wt->line_len = remain; 440 } 441} 442 443static bool wt_append_stdout(ant_worker_thread_t *wt, const char *data, size_t len) { 444 if (!wt || !data || len == 0) return true; 445 446 size_t needed = wt->line_len + len; 447 if (needed + 1 > wt->line_cap) { 448 size_t cap = wt->line_cap ? wt->line_cap : 1024; 449 while (cap < needed + 1) cap *= 2; 450 char *next = (char *)realloc(wt->line_buf, cap); 451 if (!next) return false; 452 wt->line_buf = next; 453 wt->line_cap = cap; 454 } 455 456 memcpy(wt->line_buf + wt->line_len, data, len); 457 wt->line_len += len; 458 wt->line_buf[wt->line_len] = '\0'; 459 return true; 460} 461 462static void wt_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { 463 ant_worker_thread_t *wt = (ant_worker_thread_t *)stream->data; 464 if (!wt) { 465 free(buf->base); 466 return; 467 } 468 469 if (nread > 0) { 470 if (wt_append_stdout(wt, buf->base, (size_t)nread)) wt_process_lines(wt); 471 } else if (nread < 0) { 472 uv_read_stop(stream); 473 } 474 475 free(buf->base); 476} 477 478static char *wt_path_from_specifier(ant_t *js, ant_value_t spec) { 479 const char *raw = NULL; 480 size_t len = 0; 481 482 if (vtype(spec) == T_STR) { 483 raw = js_getstr(js, spec, &len); 484 } else if (is_object_type(spec)) { 485 ant_value_t pathname = js_get(js, spec, "pathname"); 486 if (vtype(pathname) == T_STR) raw = js_getstr(js, pathname, &len); 487 if (!raw) { 488 ant_value_t href = js_get(js, spec, "href"); 489 if (vtype(href) == T_STR) raw = js_getstr(js, href, &len); 490 } 491 } 492 493 if (!raw || len == 0) return NULL; 494 495 if (len >= 7 && strncmp(raw, "file://", 7) == 0) { 496 const char *p = raw + 7; 497 if (strncmp(p, "localhost/", 10) == 0) p += 9; 498 if (*p == '\0') return NULL; 499 return strndup(p, len - (size_t)(p - raw)); 500 } 501 502 return strndup(raw, len); 503} 504 505static int wt_spawn_worker( 506 ant_worker_thread_t *wt, 507 const char *script_path, 508 const char *worker_data_json, 509 const char *env_store_json 510) { 511 if (!wt || !wt->js || !script_path || !rt || !rt->argv || rt->argc <= 0) return UV_EINVAL; 512 513 uv_loop_t *loop = uv_default_loop(); 514 uv_pipe_init(loop, &wt->stdout_pipe, 0); 515 516 uv_stdio_container_t stdio[3]; 517 stdio[0].flags = UV_IGNORE; 518 stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; 519 stdio[1].data.stream = (uv_stream_t *)&wt->stdout_pipe; 520 stdio[2].flags = UV_INHERIT_FD; 521 stdio[2].data.fd = 2; 522 523 char *argv0 = strdup(rt->argv[0]); 524 char *argv1 = strdup(script_path); 525 if (!argv0 || !argv1) { 526 free(argv0); 527 free(argv1); 528 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL); 529 return UV_ENOMEM; 530 } 531 532 char *args[3] = {argv0, argv1, NULL}; 533 char **env = wt_build_worker_env(worker_data_json, env_store_json); 534 if (!env) { 535 free(argv0); 536 free(argv1); 537 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL); 538 return UV_ENOMEM; 539 } 540 541 uv_process_options_t options; 542 memset(&options, 0, sizeof(options)); 543 options.file = argv0; 544 options.args = args; 545 options.env = env; 546 options.stdio_count = 3; 547 options.stdio = stdio; 548 options.exit_cb = wt_on_process_exit; 549 550 wt->process.data = wt; 551 wt->stdout_pipe.data = wt; 552 553 int rc = uv_spawn(loop, &wt->process, &options); 554 555 wt_free_env(env); 556 free(argv0); 557 free(argv1); 558 559 if (rc != 0) { 560 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL); 561 return rc; 562 } 563 564 wt->spawned = true; 565 wt->refed = true; 566 567 rc = uv_read_start((uv_stream_t *)&wt->stdout_pipe, wt_alloc_cb, wt_read_cb); 568 if (rc != 0) { 569 uv_process_kill(&wt->process, SIGTERM); 570 } 571 return rc; 572} 573 574static ant_value_t worker_threads_worker_on(ant_t *js, ant_value_t *args, int nargs) { 575 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 576 return js_mkerr(js, "Worker.on(event, listener) requires (string, function)"); 577 } 578 579 ant_value_t this_obj = js_getthis(js); 580 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver"); 581 582 size_t len = 0; 583 const char *event = js_getstr(js, args[0], &len); 584 if (!event) return js_mkerr(js, "invalid event name"); 585 586 if (len == 7 && memcmp(event, "message", 7) == 0) { 587 js_set_slot(this_obj, SLOT_WT_ON_MESSAGE, args[1]); 588 } else if (len == 4 && memcmp(event, "exit", 4) == 0) { 589 js_set_slot(this_obj, SLOT_WT_ON_EXIT, args[1]); 590 } 591 592 return this_obj; 593} 594 595static ant_value_t worker_threads_worker_once(ant_t *js, ant_value_t *args, int nargs) { 596 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 597 return js_mkerr(js, "Worker.once(event, listener) requires (string, function)"); 598 } 599 600 ant_value_t this_obj = js_getthis(js); 601 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver"); 602 603 size_t len = 0; 604 const char *event = js_getstr(js, args[0], &len); 605 if (!event) return js_mkerr(js, "invalid event name"); 606 607 if (len == 7 && memcmp(event, "message", 7) == 0) { 608 js_set_slot(this_obj, SLOT_WT_ONCE_MESSAGE, args[1]); 609 } else if (len == 4 && memcmp(event, "exit", 4) == 0) { 610 js_set_slot(this_obj, SLOT_WT_ONCE_EXIT, args[1]); 611 } 612 613 return this_obj; 614} 615 616static ant_value_t worker_threads_worker_unref(ant_t *js, ant_value_t *args, int nargs) { 617 ant_value_t this_obj = js_getthis(js); 618 ant_worker_thread_t *wt = wt_get_worker(js, this_obj); 619 if (!wt) return js_mkerr(js, "invalid Worker receiver"); 620 if (wt->spawned && wt->refed) { 621 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_unref((uv_handle_t *)&wt->process); 622 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_unref((uv_handle_t *)&wt->stdout_pipe); 623 wt->refed = false; 624 } 625 return this_obj; 626} 627 628static ant_value_t worker_threads_worker_ref(ant_t *js, ant_value_t *args, int nargs) { 629 ant_value_t this_obj = js_getthis(js); 630 ant_worker_thread_t *wt = wt_get_worker(js, this_obj); 631 if (!wt) return js_mkerr(js, "invalid Worker receiver"); 632 if (wt->spawned && !wt->refed) { 633 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_ref((uv_handle_t *)&wt->process); 634 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_ref((uv_handle_t *)&wt->stdout_pipe); 635 wt->refed = true; 636 } 637 return this_obj; 638} 639 640static ant_value_t worker_threads_worker_terminate(ant_t *js, ant_value_t *args, int nargs) { 641 ant_value_t this_obj = js_getthis(js); 642 ant_worker_thread_t *wt = wt_get_worker(js, this_obj); 643 if (!wt) return js_mkerr(js, "invalid Worker receiver"); 644 645 if (wt->exited) return wt_make_resolved_promise(js, js_mknum((double)wt->exit_status)); 646 647 if (!wt->has_terminate_val) { 648 ant_value_t p = js_mkpromise(js); 649 wt->terminate_val = p; 650 wt->has_terminate_val = true; 651 } 652 653 int rc = uv_process_kill(&wt->process, SIGTERM); 654 if (rc != 0) { 655 ant_value_t p = wt->terminate_val; 656 js_reject_promise(js, p, js_mkerr(js, "terminate failed: %s", uv_strerror(rc))); 657 wt->terminate_val = js_mkundef(); 658 wt->has_terminate_val = false; 659 return p; 660 } 661 662 return wt->terminate_val; 663} 664 665static ant_value_t worker_threads_worker_post_message(ant_t *js, ant_value_t *args, int nargs) { 666 return js_mkerr(js, "Worker.postMessage is not implemented yet"); 667} 668 669static ant_value_t worker_threads_message_port_post_message(ant_t *js, ant_value_t *args, int nargs) { 670 ant_value_t this_obj = js_getthis(js); 671 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 672 if (wt_port_is_closed(js, this_obj)) return js_mkundef(); 673 674 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER); 675 if (!wt_is_message_port(js, peer) || wt_port_is_closed(js, peer)) return js_mkundef(); 676 677 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef(); 678 wt_port_queue_push(js, peer, value); 679 wt_port_drain(js, peer); 680 return js_mkundef(); 681} 682 683static ant_value_t worker_threads_message_port_on(ant_t *js, ant_value_t *args, int nargs) { 684 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 685 return js_mkerr(js, "MessagePort.on(event, listener) requires (string, function)"); 686 } 687 ant_value_t this_obj = js_getthis(js); 688 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 689 690 size_t len = 0; 691 const char *event = js_getstr(js, args[0], &len); 692 if (!event) return js_mkerr(js, "invalid event name"); 693 if (len == 7 && memcmp(event, "message", 7) == 0) { 694 js_set_slot(this_obj, SLOT_WT_PORT_ON_MESSAGE, args[1]); 695 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true); 696 wt_port_drain(js, this_obj); 697 } 698 return this_obj; 699} 700 701static ant_value_t worker_threads_message_port_once(ant_t *js, ant_value_t *args, int nargs) { 702 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 703 return js_mkerr(js, "MessagePort.once(event, listener) requires (string, function)"); 704 } 705 ant_value_t this_obj = js_getthis(js); 706 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 707 708 size_t len = 0; 709 const char *event = js_getstr(js, args[0], &len); 710 if (!event) return js_mkerr(js, "invalid event name"); 711 if (len == 7 && memcmp(event, "message", 7) == 0) { 712 js_set_slot(this_obj, SLOT_WT_PORT_ONCE_MESSAGE, args[1]); 713 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true); 714 wt_port_drain(js, this_obj); 715 } 716 return this_obj; 717} 718 719static ant_value_t worker_threads_message_port_start(ant_t *js, ant_value_t *args, int nargs) { 720 ant_value_t this_obj = js_getthis(js); 721 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 722 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true); 723 wt_port_drain(js, this_obj); 724 return js_mkundef(); 725} 726 727static ant_value_t worker_threads_message_port_close(ant_t *js, ant_value_t *args, int nargs) { 728 ant_value_t this_obj = js_getthis(js); 729 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 730 wt_port_set_closed(js, this_obj, true); 731 732 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER); 733 js_set_slot(this_obj, SLOT_WT_PORT_PEER, js_mknull()); 734 if (wt_is_message_port(js, peer)) js_set_slot(peer, SLOT_WT_PORT_PEER, js_mknull()); 735 return js_mkundef(); 736} 737 738static ant_value_t worker_threads_message_port_ref(ant_t *js, ant_value_t *args, int nargs) { 739 return js_mkundef(); 740} 741 742static ant_value_t worker_threads_message_port_unref(ant_t *js, ant_value_t *args, int nargs) { 743 return js_mkundef(); 744} 745 746static ant_value_t worker_threads_message_port_ctor(ant_t *js, ant_value_t *args, int nargs) { 747 return js_mkerr(js, "MessagePort constructor is not public"); 748} 749 750static ant_value_t worker_threads_message_channel_ctor(ant_t *js, ant_value_t *args, int nargs) { 751 if (vtype(js->new_target) == T_UNDEF) { 752 return js_mkerr(js, "MessageChannel constructor requires 'new'"); 753 } 754 755 ant_value_t this_obj = js_getthis(js); 756 ant_value_t port1 = wt_make_message_port(js); 757 ant_value_t port2 = wt_make_message_port(js); 758 js_set_slot(port1, SLOT_WT_PORT_PEER, port2); 759 js_set_slot(port2, SLOT_WT_PORT_PEER, port1); 760 js_set(js, this_obj, "port1", port1); 761 js_set(js, this_obj, "port2", port2); 762 return this_obj; 763} 764 765static ant_value_t worker_threads_worker_ctor(ant_t *js, ant_value_t *args, int nargs) { 766 ant_value_t this_obj = js_getthis(js); 767 ant_value_t proto = js_instance_proto_from_new_target(js, js_mkundef()); 768 769 if (vtype(js->new_target) == T_UNDEF) { 770 return js_mkerr(js, "Worker constructor requires 'new'"); 771 } 772 if (nargs < 1) return js_mkerr(js, "Worker() requires a filename or URL"); 773 if (is_object_type(this_obj) && is_object_type(proto)) js_set_proto_init(this_obj, proto); 774 775 char *script_path = wt_path_from_specifier(js, args[0]); 776 if (!script_path) return js_mkerr(js, "Invalid Worker filename/URL"); 777 778 const char *worker_data_json = NULL; 779 const char *env_store_json = NULL; 780 char *worker_data_heap = NULL; 781 char *env_store_heap = NULL; 782 if (nargs >= 2 && is_object_type(args[1])) { 783 ant_value_t worker_data = js_get(js, args[1], "workerData"); 784 if (!is_undefined(worker_data)) { 785 ant_value_t stringify_args[1] = {worker_data}; 786 ant_value_t json = js_json_stringify(js, stringify_args, 1); 787 if (vtype(json) != T_STR) { 788 free(script_path); 789 return js_mkerr(js, "Worker options.workerData must be JSON-serializable"); 790 } 791 size_t len = 0; 792 const char *raw = js_getstr(js, json, &len); 793 if (!raw) { 794 free(script_path); 795 return js_mkerr(js, "Failed to stringify workerData"); 796 } 797 worker_data_heap = strndup(raw, len); 798 if (!worker_data_heap) { 799 free(script_path); 800 return js_mkerr(js, "Out of memory"); 801 } 802 worker_data_json = worker_data_heap; 803 } 804 } 805 806 ant_value_t env_store = wt_get_or_create_env_store(js); 807 ant_value_t env_stringify_args[1] = {env_store}; 808 ant_value_t env_json = js_json_stringify(js, env_stringify_args, 1); 809 if (vtype(env_json) != T_STR) { 810 free(script_path); 811 free(worker_data_heap); 812 return js_mkerr(js, "setEnvironmentData values must be JSON-serializable"); 813 } 814 size_t env_len = 0; 815 const char *env_raw = js_getstr(js, env_json, &env_len); 816 if (!env_raw) { 817 free(script_path); 818 free(worker_data_heap); 819 return js_mkerr(js, "Failed to snapshot environment data"); 820 } 821 env_store_heap = strndup(env_raw, env_len); 822 if (!env_store_heap) { 823 free(script_path); 824 free(worker_data_heap); 825 return js_mkerr(js, "Out of memory"); 826 } 827 env_store_json = env_store_heap; 828 829 ant_worker_thread_t *wt = (ant_worker_thread_t *)calloc(1, sizeof(*wt)); 830 if (!wt) { 831 free(script_path); 832 free(worker_data_heap); 833 free(env_store_heap); 834 return js_mkerr(js, "Out of memory"); 835 } 836 837 wt->js = js; 838 wt->self_val = this_obj; 839 js_set_slot(this_obj, SLOT_DATA, ANT_PTR(wt)); 840 841 wt->prev = NULL; 842 wt->next = active_workers_head; 843 if (active_workers_head) active_workers_head->prev = wt; 844 active_workers_head = wt; 845 846 int rc = wt_spawn_worker(wt, script_path, worker_data_json, env_store_json); 847 free(script_path); 848 free(worker_data_heap); 849 free(env_store_heap); 850 851 if (rc != 0) { 852 js_set_slot(this_obj, SLOT_DATA, js_mkundef()); 853 wt_cleanup(wt); 854 return js_mkerr(js, "Failed to spawn Worker: %s", uv_strerror(rc)); 855 } 856 857 return this_obj; 858} 859 860static ant_value_t worker_threads_parent_post_message(ant_t *js, ant_value_t *args, int nargs) { 861 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef(); 862 ant_value_t stringify_args[1] = {value}; 863 ant_value_t json = js_json_stringify(js, stringify_args, 1); 864 if (vtype(json) != T_STR) return js_mkerr(js, "parentPort.postMessage payload must be JSON-serializable"); 865 866 size_t json_len = 0; 867 const char *json_str = js_getstr(js, json, &json_len); 868 if (!json_str) return js_mkerr(js, "Failed to serialize message"); 869 870 size_t prefix_len = strlen(WT_MSG_PREFIX); 871 size_t line_len = prefix_len + json_len + 1; 872 char *line = (char *)malloc(line_len + 1); 873 if (!line) return js_mkerr(js, "Out of memory"); 874 875 memcpy(line, WT_MSG_PREFIX, prefix_len); 876 memcpy(line + prefix_len, json_str, json_len); 877 line[prefix_len + json_len] = '\n'; 878 line[prefix_len + json_len + 1] = '\0'; 879 880 ssize_t wrote = WT_WRITE(WT_STDOUT_FD, line, (unsigned int)line_len); 881 free(line); 882 if (wrote < 0) return js_mkerr(js, "parentPort.postMessage failed"); 883 return js_mkundef(); 884} 885 886static ant_value_t worker_threads_parent_unref(ant_t *js, ant_value_t *args, int nargs) { 887 return js_mkundef(); 888} 889 890static ant_value_t worker_threads_mark_as_untransferable(ant_t *js, ant_value_t *args, int nargs) { 891 if (nargs < 1) return js_mkundef(); 892 return args[0]; 893} 894 895static ant_value_t worker_threads_receive_message_on_port(ant_t *js, ant_value_t *args, int nargs) { 896 if (nargs < 1 || !wt_is_message_port(js, args[0])) { 897 return js_mkerr(js, "receiveMessageOnPort(port) requires a MessagePort"); 898 } 899 ant_value_t msg = js_mkundef(); 900 if (!wt_port_queue_dequeue(js, args[0], &msg)) return js_mkundef(); 901 902 ant_value_t out = js_mkobj(js); 903 js_set(js, out, "message", msg); 904 return out; 905} 906 907static ant_value_t worker_threads_set_environment_data(ant_t *js, ant_value_t *args, int nargs) { 908 if (nargs < 2) return js_mkerr(js, "setEnvironmentData(key, value) requires 2 arguments"); 909 910 ant_value_t key_stringify_args[1] = {args[0]}; 911 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1); 912 if (vtype(key_json) != T_STR) return js_mkerr(js, "setEnvironmentData key must be JSON-serializable"); 913 914 ant_value_t value_stringify_args[1] = {args[1]}; 915 ant_value_t value_json = js_json_stringify(js, value_stringify_args, 1); 916 if (vtype(value_json) != T_STR) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable"); 917 918 ant_value_t cloned = json_parse_value(js, value_json); 919 if (is_err(cloned)) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable"); 920 921 size_t key_len = 0; 922 const char *key_ptr = js_getstr(js, key_json, &key_len); 923 if (!key_ptr) return js_mkerr(js, "Failed to serialize environment data key"); 924 925 char *key = strndup(key_ptr, key_len); 926 if (!key) return js_mkerr(js, "Out of memory"); 927 js_set(js, wt_get_or_create_env_store(js), key, cloned); 928 free(key); 929 return js_mkundef(); 930} 931 932static ant_value_t worker_threads_get_environment_data(ant_t *js, ant_value_t *args, int nargs) { 933 if (nargs < 1) return js_mkundef(); 934 935 ant_value_t key_stringify_args[1] = {args[0]}; 936 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1); 937 if (vtype(key_json) != T_STR) return js_mkundef(); 938 939 size_t key_len = 0; 940 const char *key_ptr = js_getstr(js, key_json, &key_len); 941 if (!key_ptr) return js_mkundef(); 942 943 char *key = strndup(key_ptr, key_len); 944 if (!key) return js_mkerr(js, "Out of memory"); 945 946 ant_value_t store = wt_get_or_create_env_store(js); 947 ant_offset_t off = lkp(js, store, key, key_len); 948 if (off == 0) { 949 free(key); 950 return js_mkundef(); 951 } 952 953 ant_value_t value = js_get(js, store, key); 954 free(key); 955 return value; 956} 957 958static ant_value_t worker_threads_move_message_port_to_context(ant_t *js, ant_value_t *args, int nargs) { 959 if (nargs < 1 || !wt_is_message_port(js, args[0])) { 960 return js_mkerr(js, "moveMessagePortToContext(port, context) requires a MessagePort"); 961 } 962 return args[0]; 963} 964 965void gc_mark_worker_threads(ant_t *js, gc_mark_fn mark) { 966 for (ant_worker_thread_t *wt = active_workers_head; wt; wt = wt->next) { 967 mark(js, wt->self_val); 968 if (wt->has_terminate_val) mark(js, wt->terminate_val); 969 } 970} 971 972ant_value_t worker_threads_library(ant_t *js) { 973 ant_value_t lib = js_mkobj(js); 974 bool is_worker = wt_is_worker_mode(); 975 976 wt_init_env_store(js, is_worker); 977 978 js_set(js, lib, "isMainThread", js_bool(!is_worker)); 979 js_set(js, lib, "threadId", js_mknum((double)(is_worker ? rt->pid : 0))); 980 js_set(js, lib, "SHARE_ENV", js_mksym(js, "SHARE_ENV")); 981 982 ant_value_t message_port_ctor_obj = js_mkobj(js); 983 ant_value_t message_port_proto = js_mkobj(js); 984 js_set(js, message_port_proto, "postMessage", js_mkfun(worker_threads_message_port_post_message)); 985 js_set(js, message_port_proto, "on", js_mkfun(worker_threads_message_port_on)); 986 js_set(js, message_port_proto, "once", js_mkfun(worker_threads_message_port_once)); 987 js_set(js, message_port_proto, "start", js_mkfun(worker_threads_message_port_start)); 988 js_set(js, message_port_proto, "close", js_mkfun(worker_threads_message_port_close)); 989 js_set(js, message_port_proto, "ref", js_mkfun(worker_threads_message_port_ref)); 990 js_set(js, message_port_proto, "unref", js_mkfun(worker_threads_message_port_unref)); 991 js_set_sym(js, message_port_proto, get_toStringTag_sym(), js_mkstr(js, "MessagePort", 11)); 992 993 js_set_slot(message_port_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_port_ctor)); 994 js_mkprop_fast(js, message_port_ctor_obj, "prototype", 9, message_port_proto); 995 js_mkprop_fast(js, message_port_ctor_obj, "name", 4, js_mkstr(js, "MessagePort", 11)); 996 js_set_descriptor(js, message_port_ctor_obj, "name", 4, 0); 997 js_set(js, lib, "MessagePort", js_obj_to_func(message_port_ctor_obj)); 998 js_set_slot(js->global, SLOT_WT_PORT_PROTO, message_port_proto); 999 1000 ant_value_t message_channel_ctor_obj = js_mkobj(js); 1001 ant_value_t message_channel_proto = js_mkobj(js); 1002 js_set_sym(js, message_channel_proto, get_toStringTag_sym(), js_mkstr(js, "MessageChannel", 14)); 1003 js_set_slot(message_channel_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_channel_ctor)); 1004 js_mkprop_fast(js, message_channel_ctor_obj, "prototype", 9, message_channel_proto); 1005 js_mkprop_fast(js, message_channel_ctor_obj, "name", 4, js_mkstr(js, "MessageChannel", 14)); 1006 js_set_descriptor(js, message_channel_ctor_obj, "name", 4, 0); 1007 js_set(js, lib, "MessageChannel", js_obj_to_func(message_channel_ctor_obj)); 1008 1009 if (is_worker) { 1010 ant_value_t parent_port = js_mkobj(js); 1011 js_set(js, parent_port, "postMessage", js_mkfun(worker_threads_parent_post_message)); 1012 js_set(js, parent_port, "unref", js_mkfun(worker_threads_parent_unref)); 1013 js_set(js, parent_port, "ref", js_mkfun(worker_threads_parent_unref)); 1014 js_set(js, lib, "parentPort", parent_port); 1015 1016 const char *worker_data_json = getenv(WT_ENV_DATA_JSON); 1017 if (worker_data_json && worker_data_json[0]) { 1018 ant_value_t raw = js_mkstr(js, worker_data_json, strlen(worker_data_json)); 1019 ant_value_t parsed = json_parse_value(js, raw); 1020 js_set(js, lib, "workerData", is_err(parsed) ? js_mkundef() : parsed); 1021 } else js_set(js, lib, "workerData", js_mkundef()); 1022 } else { 1023 js_set(js, lib, "parentPort", js_mknull()); 1024 js_set(js, lib, "workerData", js_mkundef()); 1025 } 1026 1027 ant_value_t worker_ctor_obj = js_mkobj(js); 1028 ant_value_t worker_proto = js_mkobj(js); 1029 1030 js_set(js, worker_proto, "on", js_mkfun(worker_threads_worker_on)); 1031 js_set(js, worker_proto, "once", js_mkfun(worker_threads_worker_once)); 1032 js_set(js, worker_proto, "terminate", js_mkfun(worker_threads_worker_terminate)); 1033 js_set(js, worker_proto, "unref", js_mkfun(worker_threads_worker_unref)); 1034 js_set(js, worker_proto, "ref", js_mkfun(worker_threads_worker_ref)); 1035 js_set(js, worker_proto, "postMessage", js_mkfun(worker_threads_worker_post_message)); 1036 js_set_sym(js, worker_proto, get_toStringTag_sym(), js_mkstr(js, "Worker", 6)); 1037 1038 js_set_slot(worker_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_worker_ctor)); 1039 js_mkprop_fast(js, worker_ctor_obj, "prototype", 9, worker_proto); 1040 js_mkprop_fast(js, worker_ctor_obj, "name", 4, js_mkstr(js, "Worker", 6)); 1041 js_set_descriptor(js, worker_ctor_obj, "name", 4, 0); 1042 js_set(js, lib, "Worker", js_obj_to_func(worker_ctor_obj)); 1043 1044 js_set(js, lib, "markAsUntransferable", js_mkfun(worker_threads_mark_as_untransferable)); 1045 js_set(js, lib, "receiveMessageOnPort", js_mkfun(worker_threads_receive_message_on_port)); 1046 js_set(js, lib, "setEnvironmentData", js_mkfun(worker_threads_set_environment_data)); 1047 js_set(js, lib, "getEnvironmentData", js_mkfun(worker_threads_get_environment_data)); 1048 js_set(js, lib, "moveMessagePortToContext", js_mkfun(worker_threads_move_message_port_to_context)); 1049 1050 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "worker_threads", 14)); 1051 return lib; 1052}