MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 1053 lines 36 kB view raw
1// stub: minimal node:worker_threads implementation 2// just enough for rolldown to run transforms 3 4#include <compat.h> // IWYU pragma: keep 5 6#include <uv.h> 7#include <stdio.h> 8#include <stdlib.h> 9#include <string.h> 10#include <signal.h> 11 12#ifdef _WIN32 13#include <io.h> 14#define WT_WRITE _write 15#define WT_STDOUT_FD 1 16extern char **_environ; 17#define environ _environ 18#else 19#include <unistd.h> 20#define WT_WRITE write 21#define WT_STDOUT_FD STDOUT_FILENO 22extern char **environ; 23#endif 24 25#include "ant.h" 26#include "ptr.h" 27#include "internal.h" 28#include "runtime.h" 29#include "descriptors.h" 30#include "silver/engine.h" 31#include "modules/json.h" 32#include "modules/symbol.h" 33#include "modules/worker_threads.h" 34#include "gc/modules.h" 35 36#define WT_ENV_MODE "ANT_WORKER_THREADS_MODE" 37#define WT_ENV_DATA_JSON "ANT_WORKER_DATA_JSON" 38#define WT_ENV_STORE_JSON "ANT_WORKER_ENV_DATA_JSON" 39#define WT_MSG_PREFIX "ANT_WT_MSG:" 40 41typedef struct ant_worker_thread { 42 ant_t *js; 43 uv_process_t process; 44 uv_pipe_t stdout_pipe; 45 bool spawned; 46 bool exited; 47 bool closing; 48 bool closed; 49 bool refed; 50 int close_pending; 51 int64_t exit_status; 52 int term_signal; 53 char *line_buf; 54 size_t line_len; 55 size_t line_cap; 56 ant_value_t self_val; 57 ant_value_t terminate_val; 58 bool has_terminate_val; 59 struct ant_worker_thread *next; 60 struct ant_worker_thread *prev; 61} ant_worker_thread_t; 62 63static ant_worker_thread_t *active_workers_head = NULL; 64 65enum { WORKER_NATIVE_TAG = 0x57524b52u }; // WRKR 66 67static bool wt_is_worker_mode(void) { 68 const char *mode = getenv(WT_ENV_MODE); 69 return mode && strcmp(mode, "1") == 0; 70} 71 72static ant_value_t wt_get_or_create_env_store(ant_t *js) { 73 ant_value_t store = js_get_slot(js->global, SLOT_WT_ENV_STORE); 74 if (!is_object_type(store)) { 75 store = js_mkobj(js); 76 js_set_slot(js->global, SLOT_WT_ENV_STORE, store); 77 } 78 return store; 79} 80 81static void wt_init_env_store(ant_t *js, bool is_worker) { 82 ant_value_t store = js_mkobj(js); 83 if (is_worker) { 84 const char *raw = getenv(WT_ENV_STORE_JSON); 85 if (raw && raw[0]) { 86 ant_value_t input = js_mkstr(js, raw, strlen(raw)); 87 ant_value_t parsed = json_parse_value(js, input); 88 if (is_object_type(parsed)) store = parsed; 89 } 90 } 91 js_set_slot(js->global, SLOT_WT_ENV_STORE, store); 92} 93 94static ant_worker_thread_t *wt_get_worker(ant_t *js, ant_value_t this_obj) { 95 if (!is_object_type(this_obj)) return NULL; 96 return (ant_worker_thread_t *)js_get_native(this_obj, WORKER_NATIVE_TAG); 97} 98 99static bool wt_is_message_port(ant_t *js, ant_value_t obj) { 100 if (!is_object_type(obj)) return false; 101 ant_value_t tag = js_get_slot(obj, SLOT_WT_PORT_TAG); 102 return js_truthy(js, tag); 103} 104 105static ant_value_t wt_get_message_port_proto(ant_t *js) { 106 ant_value_t proto = js_get_slot(js->global, SLOT_WT_PORT_PROTO); 107 if (!is_object_type(proto)) proto = js_mkobj(js); 108 return proto; 109} 110 111static ant_value_t wt_make_message_port(ant_t *js) { 112 ant_value_t port = js_mkobj(js); 113 js_set_proto_init(port, wt_get_message_port_proto(js)); 114 js_set_slot(port, SLOT_WT_PORT_TAG, js_true); 115 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js)); 116 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 117 js_set_slot(port, SLOT_WT_PORT_PEER, js_mknull()); 118 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_false); 119 js_set_slot(port, SLOT_WT_PORT_STARTED, js_false); 120 js_set_slot(port, SLOT_WT_PORT_ON_MESSAGE, js_mkundef()); 121 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef()); 122 js_set(js, port, "onmessage", js_mkundef()); 123 return port; 124} 125 126static bool wt_port_is_closed(ant_t *js, ant_value_t port) { 127 return js_truthy(js, js_get_slot(port, SLOT_WT_PORT_CLOSED)); 128} 129 130static void wt_port_set_closed(ant_t *js, ant_value_t port, bool closed) { 131 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_bool(closed)); 132} 133 134static bool wt_port_queue_dequeue(ant_t *js, ant_value_t port, ant_value_t *out) { 135 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE); 136 if (vtype(queue) != T_ARR) return false; 137 138 ant_value_t head_val = js_get_slot(port, SLOT_WT_PORT_HEAD); 139 ant_offset_t head = (vtype(head_val) == T_NUM) ? (ant_offset_t)js_getnum(head_val) : 0; 140 ant_offset_t len = js_arr_len(js, queue); 141 if (len <= 0 || head >= len) { 142 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js)); 143 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 144 return false; 145 } 146 147 if (out) *out = js_arr_get(js, queue, head); 148 ant_offset_t next_head = head + 1; 149 150 if (next_head >= len) { 151 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js)); 152 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 153 return true; 154 } 155 156 if (next_head > 32 && next_head * 2 >= len) { 157 ant_value_t compact = js_mkarr(js); 158 for (ant_offset_t i = next_head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, queue, i)); 159 js_set_slot(port, SLOT_WT_PORT_QUEUE, compact); 160 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 161 return true; 162 } 163 164 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum((double)next_head)); 165 return true; 166} 167 168static void wt_port_queue_push(ant_t *js, ant_value_t port, ant_value_t value) { 169 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE); 170 if (vtype(queue) != T_ARR) { 171 queue = js_mkarr(js); 172 js_set_slot(port, SLOT_WT_PORT_QUEUE, queue); 173 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0)); 174 } 175 js_arr_push(js, queue, value); 176} 177 178static void wt_port_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) { 179 if (!is_callable(fn)) return; 180 ant_value_t argv[1] = {arg}; 181 182 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false); 183} 184 185static bool wt_port_should_deliver(ant_t *js, ant_value_t port) { 186 if (wt_port_is_closed(js, port)) return false; 187 bool started = js_truthy(js, js_get_slot(port, SLOT_WT_PORT_STARTED)); 188 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE); 189 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE); 190 ant_value_t onmessage = js_get(js, port, "onmessage"); 191 192 bool has_event_listener = is_callable(on_fn) || is_callable(once_fn); 193 if (is_callable(onmessage)) return true; 194 return started && has_event_listener; 195} 196 197static void wt_port_drain(ant_t *js, ant_value_t port) { 198 if (!wt_is_message_port(js, port)) return; 199 200 while (wt_port_should_deliver(js, port)) { 201 ant_value_t msg = js_mkundef(); 202 if (!wt_port_queue_dequeue(js, port, &msg)) break; 203 204 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE); 205 wt_port_call_listener(js, port, on_fn, msg); 206 207 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE); 208 if (is_callable(once_fn)) { 209 wt_port_call_listener(js, port, once_fn, msg); 210 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef()); 211 } 212 213 ant_value_t onmessage = js_get(js, port, "onmessage"); 214 if (is_callable(onmessage)) { 215 ant_value_t event_obj = js_mkobj(js); 216 js_set(js, event_obj, "data", msg); 217 wt_port_call_listener(js, port, onmessage, event_obj); 218 } 219 220 if (wt_port_is_closed(js, port)) break; 221 } 222} 223 224static ant_value_t wt_make_resolved_promise(ant_t *js, ant_value_t value) { 225 ant_value_t p = js_mkpromise(js); 226 js_resolve_promise(js, p, value); 227 return p; 228} 229 230static void wt_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) { 231 if (!is_callable(fn)) return; 232 ant_value_t argv[1] = {arg}; 233 234 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false); 235} 236 237static void wt_emit(ant_worker_thread_t *wt, const char *event, ant_value_t arg) { 238 if (!wt || !wt->js) return; 239 ant_t *js = wt->js; 240 ant_value_t this_obj = wt->self_val; 241 if (!is_object_type(this_obj)) return; 242 243 internal_slot_t on_slot, once_slot; 244 if (strcmp(event, "message") == 0) { 245 on_slot = SLOT_WT_ON_MESSAGE; 246 once_slot = SLOT_WT_ONCE_MESSAGE; 247 } else if (strcmp(event, "exit") == 0) { 248 on_slot = SLOT_WT_ON_EXIT; 249 once_slot = SLOT_WT_ONCE_EXIT; 250 } else return; 251 252 ant_value_t on_fn = js_get_slot(this_obj, on_slot); 253 wt_call_listener(js, this_obj, on_fn, arg); 254 255 ant_value_t once_fn = js_get_slot(this_obj, once_slot); 256 if (is_callable(once_fn)) { 257 wt_call_listener(js, this_obj, once_fn, arg); 258 js_set_slot(this_obj, once_slot, js_mkundef()); 259 } 260} 261 262static void wt_free_env(char **env) { 263 if (!env) return; 264 for (char **p = env; *p; p++) free(*p); 265 free(env); 266} 267 268static char **wt_build_worker_env(const char *worker_data_json, const char *env_store_json) { 269 size_t count = 0; 270 if (environ) { 271 while (environ[count]) count++; 272 } 273 274 size_t extra = 2; 275 if (worker_data_json) extra++; 276 if (env_store_json) extra++; 277 char **env = (char **)calloc(count + extra, sizeof(char *)); 278 if (!env) return NULL; 279 280 size_t out = 0; 281 for (size_t i = 0; i < count; i++) { 282 env[out] = strdup(environ[i]); 283 if (!env[out]) { 284 wt_free_env(env); 285 return NULL; 286 } 287 out++; 288 } 289 290 env[out++] = strdup(WT_ENV_MODE "=1"); 291 if (!env[out - 1]) { 292 wt_free_env(env); 293 return NULL; 294 } 295 296 if (worker_data_json) { 297 size_t key_len = strlen(WT_ENV_DATA_JSON); 298 size_t val_len = strlen(worker_data_json); 299 char *entry = (char *)malloc(key_len + 1 + val_len + 1); 300 if (!entry) { 301 wt_free_env(env); 302 return NULL; 303 } 304 memcpy(entry, WT_ENV_DATA_JSON, key_len); 305 entry[key_len] = '='; 306 memcpy(entry + key_len + 1, worker_data_json, val_len); 307 entry[key_len + 1 + val_len] = '\0'; 308 env[out++] = entry; 309 } 310 311 if (env_store_json) { 312 size_t key_len = strlen(WT_ENV_STORE_JSON); 313 size_t val_len = strlen(env_store_json); 314 char *entry = (char *)malloc(key_len + 1 + val_len + 1); 315 if (!entry) { 316 wt_free_env(env); 317 return NULL; 318 } 319 memcpy(entry, WT_ENV_STORE_JSON, key_len); 320 entry[key_len] = '='; 321 memcpy(entry + key_len + 1, env_store_json, val_len); 322 entry[key_len + 1 + val_len] = '\0'; 323 env[out++] = entry; 324 } 325 326 env[out] = NULL; 327 return env; 328} 329 330static void wt_cleanup(ant_worker_thread_t *wt) { 331 if (!wt) return; 332 free(wt->line_buf); 333 free(wt); 334} 335 336static void wt_detach(ant_worker_thread_t *wt) { 337 if (!wt) return; 338 339 if (wt->prev) wt->prev->next = wt->next; 340 else active_workers_head = wt->next; 341 if (wt->next) wt->next->prev = wt->prev; 342 wt->prev = NULL; 343 wt->next = NULL; 344 345 wt->self_val = js_mkundef(); 346 wt->terminate_val = js_mkundef(); 347 wt->has_terminate_val = false; 348 free(wt->line_buf); 349 wt->line_buf = NULL; 350 wt->line_len = 0; 351 wt->line_cap = 0; 352 wt->spawned = false; 353 wt->refed = false; 354 wt->closed = true; 355 wt->js = NULL; 356} 357 358static void wt_on_handle_closed(uv_handle_t *h) { 359 ant_worker_thread_t *wt = (ant_worker_thread_t *)h->data; 360 if (!wt) return; 361 if (wt->close_pending > 0) wt->close_pending--; 362 if (wt->close_pending == 0) wt_detach(wt); 363} 364 365static void wt_finish_exit(ant_worker_thread_t *wt) { 366 if (!wt) return; 367 if (wt->closed || wt->closing) return; 368 wt->closing = true; 369 wt->spawned = false; 370 wt->refed = false; 371 wt->close_pending = 0; 372 373 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) { 374 wt->close_pending++; 375 uv_close((uv_handle_t *)&wt->stdout_pipe, wt_on_handle_closed); 376 } 377 if (!uv_is_closing((uv_handle_t *)&wt->process)) { 378 wt->close_pending++; 379 uv_close((uv_handle_t *)&wt->process, wt_on_handle_closed); 380 } 381 382 if (wt->close_pending == 0) wt_detach(wt); 383} 384 385static void wt_on_process_exit(uv_process_t *proc, int64_t exit_status, int term_signal) { 386 ant_worker_thread_t *wt = (ant_worker_thread_t *)proc->data; 387 if (!wt || !wt->js) return; 388 389 wt->exited = true; 390 wt->exit_status = exit_status; 391 wt->term_signal = term_signal; 392 393 if (wt->has_terminate_val) { 394 ant_value_t p = wt->terminate_val; 395 js_resolve_promise(wt->js, p, js_mknum((double)exit_status)); 396 wt->terminate_val = js_mkundef(); 397 wt->has_terminate_val = false; 398 } 399 400 wt_emit(wt, "exit", js_mknum((double)exit_status)); 401 wt_finish_exit(wt); 402} 403 404static void wt_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { 405 buf->base = (char *)malloc(suggested_size); 406 buf->len = buf->base ? suggested_size : 0; 407} 408 409static void wt_emit_message_from_json(ant_worker_thread_t *wt, const char *json, size_t len) { 410 if (!wt || !wt->js) return; 411 ant_t *js = wt->js; 412 413 ant_value_t s = js_mkstr(js, json, len); 414 ant_value_t msg = json_parse_value(js, s); 415 if (is_err(msg)) msg = s; 416 417 wt_emit(wt, "message", msg); 418} 419 420static void wt_process_lines(ant_worker_thread_t *wt) { 421 if (!wt || !wt->line_buf) return; 422 423 for (;;) { 424 char *nl = memchr(wt->line_buf, '\n', wt->line_len); 425 if (!nl) break; 426 427 size_t line_len = (size_t)(nl - wt->line_buf); 428 if (line_len > 0 && wt->line_buf[line_len - 1] == '\r') line_len--; 429 430 size_t prefix_len = strlen(WT_MSG_PREFIX); 431 if (line_len >= prefix_len && memcmp(wt->line_buf, WT_MSG_PREFIX, prefix_len) == 0) { 432 const char *payload = wt->line_buf + prefix_len; 433 size_t payload_len = line_len - prefix_len; 434 wt_emit_message_from_json(wt, payload, payload_len); 435 } 436 437 size_t consumed = (size_t)(nl - wt->line_buf) + 1; 438 size_t remain = wt->line_len - consumed; 439 memmove(wt->line_buf, wt->line_buf + consumed, remain); 440 wt->line_len = remain; 441 } 442} 443 444static bool wt_append_stdout(ant_worker_thread_t *wt, const char *data, size_t len) { 445 if (!wt || !data || len == 0) return true; 446 447 size_t needed = wt->line_len + len; 448 if (needed + 1 > wt->line_cap) { 449 size_t cap = wt->line_cap ? wt->line_cap : 1024; 450 while (cap < needed + 1) cap *= 2; 451 char *next = (char *)realloc(wt->line_buf, cap); 452 if (!next) return false; 453 wt->line_buf = next; 454 wt->line_cap = cap; 455 } 456 457 memcpy(wt->line_buf + wt->line_len, data, len); 458 wt->line_len += len; 459 wt->line_buf[wt->line_len] = '\0'; 460 return true; 461} 462 463static void wt_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { 464 ant_worker_thread_t *wt = (ant_worker_thread_t *)stream->data; 465 if (!wt) { 466 free(buf->base); 467 return; 468 } 469 470 if (nread > 0) { 471 if (wt_append_stdout(wt, buf->base, (size_t)nread)) wt_process_lines(wt); 472 } else if (nread < 0) { 473 uv_read_stop(stream); 474 } 475 476 free(buf->base); 477} 478 479static char *wt_path_from_specifier(ant_t *js, ant_value_t spec) { 480 const char *raw = NULL; 481 size_t len = 0; 482 483 if (vtype(spec) == T_STR) { 484 raw = js_getstr(js, spec, &len); 485 } else if (is_object_type(spec)) { 486 ant_value_t pathname = js_get(js, spec, "pathname"); 487 if (vtype(pathname) == T_STR) raw = js_getstr(js, pathname, &len); 488 if (!raw) { 489 ant_value_t href = js_get(js, spec, "href"); 490 if (vtype(href) == T_STR) raw = js_getstr(js, href, &len); 491 } 492 } 493 494 if (!raw || len == 0) return NULL; 495 496 if (len >= 7 && strncmp(raw, "file://", 7) == 0) { 497 const char *p = raw + 7; 498 if (strncmp(p, "localhost/", 10) == 0) p += 9; 499 if (*p == '\0') return NULL; 500 return strndup(p, len - (size_t)(p - raw)); 501 } 502 503 return strndup(raw, len); 504} 505 506static int wt_spawn_worker( 507 ant_worker_thread_t *wt, 508 const char *script_path, 509 const char *worker_data_json, 510 const char *env_store_json 511) { 512 if (!wt || !wt->js || !script_path || !rt || !rt->argv || rt->argc <= 0) return UV_EINVAL; 513 514 uv_loop_t *loop = uv_default_loop(); 515 uv_pipe_init(loop, &wt->stdout_pipe, 0); 516 517 uv_stdio_container_t stdio[3]; 518 stdio[0].flags = UV_IGNORE; 519 stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; 520 stdio[1].data.stream = (uv_stream_t *)&wt->stdout_pipe; 521 stdio[2].flags = UV_INHERIT_FD; 522 stdio[2].data.fd = 2; 523 524 char *argv0 = strdup(rt->argv[0]); 525 char *argv1 = strdup(script_path); 526 if (!argv0 || !argv1) { 527 free(argv0); 528 free(argv1); 529 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL); 530 return UV_ENOMEM; 531 } 532 533 char *args[3] = {argv0, argv1, NULL}; 534 char **env = wt_build_worker_env(worker_data_json, env_store_json); 535 if (!env) { 536 free(argv0); 537 free(argv1); 538 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL); 539 return UV_ENOMEM; 540 } 541 542 uv_process_options_t options; 543 memset(&options, 0, sizeof(options)); 544 options.file = argv0; 545 options.args = args; 546 options.env = env; 547 options.stdio_count = 3; 548 options.stdio = stdio; 549 options.exit_cb = wt_on_process_exit; 550 551 wt->process.data = wt; 552 wt->stdout_pipe.data = wt; 553 554 int rc = uv_spawn(loop, &wt->process, &options); 555 556 wt_free_env(env); 557 free(argv0); 558 free(argv1); 559 560 if (rc != 0) { 561 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL); 562 return rc; 563 } 564 565 wt->spawned = true; 566 wt->refed = true; 567 568 rc = uv_read_start((uv_stream_t *)&wt->stdout_pipe, wt_alloc_cb, wt_read_cb); 569 if (rc != 0) { 570 uv_process_kill(&wt->process, SIGTERM); 571 } 572 return rc; 573} 574 575static ant_value_t worker_threads_worker_on(ant_t *js, ant_value_t *args, int nargs) { 576 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 577 return js_mkerr(js, "Worker.on(event, listener) requires (string, function)"); 578 } 579 580 ant_value_t this_obj = js_getthis(js); 581 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver"); 582 583 size_t len = 0; 584 const char *event = js_getstr(js, args[0], &len); 585 if (!event) return js_mkerr(js, "invalid event name"); 586 587 if (len == 7 && memcmp(event, "message", 7) == 0) { 588 js_set_slot(this_obj, SLOT_WT_ON_MESSAGE, args[1]); 589 } else if (len == 4 && memcmp(event, "exit", 4) == 0) { 590 js_set_slot(this_obj, SLOT_WT_ON_EXIT, args[1]); 591 } 592 593 return this_obj; 594} 595 596static ant_value_t worker_threads_worker_once(ant_t *js, ant_value_t *args, int nargs) { 597 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 598 return js_mkerr(js, "Worker.once(event, listener) requires (string, function)"); 599 } 600 601 ant_value_t this_obj = js_getthis(js); 602 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver"); 603 604 size_t len = 0; 605 const char *event = js_getstr(js, args[0], &len); 606 if (!event) return js_mkerr(js, "invalid event name"); 607 608 if (len == 7 && memcmp(event, "message", 7) == 0) { 609 js_set_slot(this_obj, SLOT_WT_ONCE_MESSAGE, args[1]); 610 } else if (len == 4 && memcmp(event, "exit", 4) == 0) { 611 js_set_slot(this_obj, SLOT_WT_ONCE_EXIT, args[1]); 612 } 613 614 return this_obj; 615} 616 617static ant_value_t worker_threads_worker_unref(ant_t *js, ant_value_t *args, int nargs) { 618 ant_value_t this_obj = js_getthis(js); 619 ant_worker_thread_t *wt = wt_get_worker(js, this_obj); 620 if (!wt) return js_mkerr(js, "invalid Worker receiver"); 621 if (wt->spawned && wt->refed) { 622 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_unref((uv_handle_t *)&wt->process); 623 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_unref((uv_handle_t *)&wt->stdout_pipe); 624 wt->refed = false; 625 } 626 return this_obj; 627} 628 629static ant_value_t worker_threads_worker_ref(ant_t *js, ant_value_t *args, int nargs) { 630 ant_value_t this_obj = js_getthis(js); 631 ant_worker_thread_t *wt = wt_get_worker(js, this_obj); 632 if (!wt) return js_mkerr(js, "invalid Worker receiver"); 633 if (wt->spawned && !wt->refed) { 634 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_ref((uv_handle_t *)&wt->process); 635 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_ref((uv_handle_t *)&wt->stdout_pipe); 636 wt->refed = true; 637 } 638 return this_obj; 639} 640 641static ant_value_t worker_threads_worker_terminate(ant_t *js, ant_value_t *args, int nargs) { 642 ant_value_t this_obj = js_getthis(js); 643 ant_worker_thread_t *wt = wt_get_worker(js, this_obj); 644 if (!wt) return js_mkerr(js, "invalid Worker receiver"); 645 646 if (wt->exited) return wt_make_resolved_promise(js, js_mknum((double)wt->exit_status)); 647 648 if (!wt->has_terminate_val) { 649 ant_value_t p = js_mkpromise(js); 650 wt->terminate_val = p; 651 wt->has_terminate_val = true; 652 } 653 654 int rc = uv_process_kill(&wt->process, SIGTERM); 655 if (rc != 0) { 656 ant_value_t p = wt->terminate_val; 657 js_reject_promise(js, p, js_mkerr(js, "terminate failed: %s", uv_strerror(rc))); 658 wt->terminate_val = js_mkundef(); 659 wt->has_terminate_val = false; 660 return p; 661 } 662 663 return wt->terminate_val; 664} 665 666static ant_value_t worker_threads_worker_post_message(ant_t *js, ant_value_t *args, int nargs) { 667 return js_mkerr(js, "Worker.postMessage is not implemented yet"); 668} 669 670static ant_value_t worker_threads_message_port_post_message(ant_t *js, ant_value_t *args, int nargs) { 671 ant_value_t this_obj = js_getthis(js); 672 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 673 if (wt_port_is_closed(js, this_obj)) return js_mkundef(); 674 675 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER); 676 if (!wt_is_message_port(js, peer) || wt_port_is_closed(js, peer)) return js_mkundef(); 677 678 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef(); 679 wt_port_queue_push(js, peer, value); 680 wt_port_drain(js, peer); 681 return js_mkundef(); 682} 683 684static ant_value_t worker_threads_message_port_on(ant_t *js, ant_value_t *args, int nargs) { 685 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 686 return js_mkerr(js, "MessagePort.on(event, listener) requires (string, function)"); 687 } 688 ant_value_t this_obj = js_getthis(js); 689 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 690 691 size_t len = 0; 692 const char *event = js_getstr(js, args[0], &len); 693 if (!event) return js_mkerr(js, "invalid event name"); 694 if (len == 7 && memcmp(event, "message", 7) == 0) { 695 js_set_slot(this_obj, SLOT_WT_PORT_ON_MESSAGE, args[1]); 696 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true); 697 wt_port_drain(js, this_obj); 698 } 699 return this_obj; 700} 701 702static ant_value_t worker_threads_message_port_once(ant_t *js, ant_value_t *args, int nargs) { 703 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) { 704 return js_mkerr(js, "MessagePort.once(event, listener) requires (string, function)"); 705 } 706 ant_value_t this_obj = js_getthis(js); 707 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 708 709 size_t len = 0; 710 const char *event = js_getstr(js, args[0], &len); 711 if (!event) return js_mkerr(js, "invalid event name"); 712 if (len == 7 && memcmp(event, "message", 7) == 0) { 713 js_set_slot(this_obj, SLOT_WT_PORT_ONCE_MESSAGE, args[1]); 714 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true); 715 wt_port_drain(js, this_obj); 716 } 717 return this_obj; 718} 719 720static ant_value_t worker_threads_message_port_start(ant_t *js, ant_value_t *args, int nargs) { 721 ant_value_t this_obj = js_getthis(js); 722 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 723 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true); 724 wt_port_drain(js, this_obj); 725 return js_mkundef(); 726} 727 728static ant_value_t worker_threads_message_port_close(ant_t *js, ant_value_t *args, int nargs) { 729 ant_value_t this_obj = js_getthis(js); 730 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver"); 731 wt_port_set_closed(js, this_obj, true); 732 733 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER); 734 js_set_slot(this_obj, SLOT_WT_PORT_PEER, js_mknull()); 735 if (wt_is_message_port(js, peer)) js_set_slot(peer, SLOT_WT_PORT_PEER, js_mknull()); 736 return js_mkundef(); 737} 738 739static ant_value_t worker_threads_message_port_ref(ant_t *js, ant_value_t *args, int nargs) { 740 return js_mkundef(); 741} 742 743static ant_value_t worker_threads_message_port_unref(ant_t *js, ant_value_t *args, int nargs) { 744 return js_mkundef(); 745} 746 747static ant_value_t worker_threads_message_port_ctor(ant_t *js, ant_value_t *args, int nargs) { 748 return js_mkerr(js, "MessagePort constructor is not public"); 749} 750 751static ant_value_t worker_threads_message_channel_ctor(ant_t *js, ant_value_t *args, int nargs) { 752 if (vtype(js->new_target) == T_UNDEF) { 753 return js_mkerr(js, "MessageChannel constructor requires 'new'"); 754 } 755 756 ant_value_t this_obj = js_getthis(js); 757 ant_value_t port1 = wt_make_message_port(js); 758 ant_value_t port2 = wt_make_message_port(js); 759 js_set_slot(port1, SLOT_WT_PORT_PEER, port2); 760 js_set_slot(port2, SLOT_WT_PORT_PEER, port1); 761 js_set(js, this_obj, "port1", port1); 762 js_set(js, this_obj, "port2", port2); 763 return this_obj; 764} 765 766static ant_value_t worker_threads_worker_ctor(ant_t *js, ant_value_t *args, int nargs) { 767 ant_value_t this_obj = js_getthis(js); 768 ant_value_t proto = js_instance_proto_from_new_target(js, js_mkundef()); 769 770 if (vtype(js->new_target) == T_UNDEF) { 771 return js_mkerr(js, "Worker constructor requires 'new'"); 772 } 773 if (nargs < 1) return js_mkerr(js, "Worker() requires a filename or URL"); 774 if (is_object_type(this_obj) && is_object_type(proto)) js_set_proto_init(this_obj, proto); 775 776 char *script_path = wt_path_from_specifier(js, args[0]); 777 if (!script_path) return js_mkerr(js, "Invalid Worker filename/URL"); 778 779 const char *worker_data_json = NULL; 780 const char *env_store_json = NULL; 781 char *worker_data_heap = NULL; 782 char *env_store_heap = NULL; 783 if (nargs >= 2 && is_object_type(args[1])) { 784 ant_value_t worker_data = js_get(js, args[1], "workerData"); 785 if (!is_undefined(worker_data)) { 786 ant_value_t stringify_args[1] = {worker_data}; 787 ant_value_t json = js_json_stringify(js, stringify_args, 1); 788 if (vtype(json) != T_STR) { 789 free(script_path); 790 return js_mkerr(js, "Worker options.workerData must be JSON-serializable"); 791 } 792 size_t len = 0; 793 const char *raw = js_getstr(js, json, &len); 794 if (!raw) { 795 free(script_path); 796 return js_mkerr(js, "Failed to stringify workerData"); 797 } 798 worker_data_heap = strndup(raw, len); 799 if (!worker_data_heap) { 800 free(script_path); 801 return js_mkerr(js, "Out of memory"); 802 } 803 worker_data_json = worker_data_heap; 804 } 805 } 806 807 ant_value_t env_store = wt_get_or_create_env_store(js); 808 ant_value_t env_stringify_args[1] = {env_store}; 809 ant_value_t env_json = js_json_stringify(js, env_stringify_args, 1); 810 if (vtype(env_json) != T_STR) { 811 free(script_path); 812 free(worker_data_heap); 813 return js_mkerr(js, "setEnvironmentData values must be JSON-serializable"); 814 } 815 size_t env_len = 0; 816 const char *env_raw = js_getstr(js, env_json, &env_len); 817 if (!env_raw) { 818 free(script_path); 819 free(worker_data_heap); 820 return js_mkerr(js, "Failed to snapshot environment data"); 821 } 822 env_store_heap = strndup(env_raw, env_len); 823 if (!env_store_heap) { 824 free(script_path); 825 free(worker_data_heap); 826 return js_mkerr(js, "Out of memory"); 827 } 828 env_store_json = env_store_heap; 829 830 ant_worker_thread_t *wt = (ant_worker_thread_t *)calloc(1, sizeof(*wt)); 831 if (!wt) { 832 free(script_path); 833 free(worker_data_heap); 834 free(env_store_heap); 835 return js_mkerr(js, "Out of memory"); 836 } 837 838 wt->js = js; 839 wt->self_val = this_obj; 840 js_set_native(this_obj, wt, WORKER_NATIVE_TAG); 841 842 wt->prev = NULL; 843 wt->next = active_workers_head; 844 if (active_workers_head) active_workers_head->prev = wt; 845 active_workers_head = wt; 846 847 int rc = wt_spawn_worker(wt, script_path, worker_data_json, env_store_json); 848 free(script_path); 849 free(worker_data_heap); 850 free(env_store_heap); 851 852 if (rc != 0) { 853 js_set_slot(this_obj, SLOT_DATA, js_mkundef()); 854 wt_cleanup(wt); 855 return js_mkerr(js, "Failed to spawn Worker: %s", uv_strerror(rc)); 856 } 857 858 return this_obj; 859} 860 861static ant_value_t worker_threads_parent_post_message(ant_t *js, ant_value_t *args, int nargs) { 862 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef(); 863 ant_value_t stringify_args[1] = {value}; 864 ant_value_t json = js_json_stringify(js, stringify_args, 1); 865 if (vtype(json) != T_STR) return js_mkerr(js, "parentPort.postMessage payload must be JSON-serializable"); 866 867 size_t json_len = 0; 868 const char *json_str = js_getstr(js, json, &json_len); 869 if (!json_str) return js_mkerr(js, "Failed to serialize message"); 870 871 size_t prefix_len = strlen(WT_MSG_PREFIX); 872 size_t line_len = prefix_len + json_len + 1; 873 char *line = (char *)malloc(line_len + 1); 874 if (!line) return js_mkerr(js, "Out of memory"); 875 876 memcpy(line, WT_MSG_PREFIX, prefix_len); 877 memcpy(line + prefix_len, json_str, json_len); 878 line[prefix_len + json_len] = '\n'; 879 line[prefix_len + json_len + 1] = '\0'; 880 881 ssize_t wrote = WT_WRITE(WT_STDOUT_FD, line, (unsigned int)line_len); 882 free(line); 883 if (wrote < 0) return js_mkerr(js, "parentPort.postMessage failed"); 884 return js_mkundef(); 885} 886 887static ant_value_t worker_threads_parent_unref(ant_t *js, ant_value_t *args, int nargs) { 888 return js_mkundef(); 889} 890 891static ant_value_t worker_threads_mark_as_untransferable(ant_t *js, ant_value_t *args, int nargs) { 892 if (nargs < 1) return js_mkundef(); 893 return args[0]; 894} 895 896static ant_value_t worker_threads_receive_message_on_port(ant_t *js, ant_value_t *args, int nargs) { 897 if (nargs < 1 || !wt_is_message_port(js, args[0])) { 898 return js_mkerr(js, "receiveMessageOnPort(port) requires a MessagePort"); 899 } 900 ant_value_t msg = js_mkundef(); 901 if (!wt_port_queue_dequeue(js, args[0], &msg)) return js_mkundef(); 902 903 ant_value_t out = js_mkobj(js); 904 js_set(js, out, "message", msg); 905 return out; 906} 907 908static ant_value_t worker_threads_set_environment_data(ant_t *js, ant_value_t *args, int nargs) { 909 if (nargs < 2) return js_mkerr(js, "setEnvironmentData(key, value) requires 2 arguments"); 910 911 ant_value_t key_stringify_args[1] = {args[0]}; 912 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1); 913 if (vtype(key_json) != T_STR) return js_mkerr(js, "setEnvironmentData key must be JSON-serializable"); 914 915 ant_value_t value_stringify_args[1] = {args[1]}; 916 ant_value_t value_json = js_json_stringify(js, value_stringify_args, 1); 917 if (vtype(value_json) != T_STR) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable"); 918 919 ant_value_t cloned = json_parse_value(js, value_json); 920 if (is_err(cloned)) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable"); 921 922 size_t key_len = 0; 923 const char *key_ptr = js_getstr(js, key_json, &key_len); 924 if (!key_ptr) return js_mkerr(js, "Failed to serialize environment data key"); 925 926 char *key = strndup(key_ptr, key_len); 927 if (!key) return js_mkerr(js, "Out of memory"); 928 js_set(js, wt_get_or_create_env_store(js), key, cloned); 929 free(key); 930 return js_mkundef(); 931} 932 933static ant_value_t worker_threads_get_environment_data(ant_t *js, ant_value_t *args, int nargs) { 934 if (nargs < 1) return js_mkundef(); 935 936 ant_value_t key_stringify_args[1] = {args[0]}; 937 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1); 938 if (vtype(key_json) != T_STR) return js_mkundef(); 939 940 size_t key_len = 0; 941 const char *key_ptr = js_getstr(js, key_json, &key_len); 942 if (!key_ptr) return js_mkundef(); 943 944 char *key = strndup(key_ptr, key_len); 945 if (!key) return js_mkerr(js, "Out of memory"); 946 947 ant_value_t store = wt_get_or_create_env_store(js); 948 ant_offset_t off = lkp(js, store, key, key_len); 949 if (off == 0) { 950 free(key); 951 return js_mkundef(); 952 } 953 954 ant_value_t value = js_get(js, store, key); 955 free(key); 956 return value; 957} 958 959static ant_value_t worker_threads_move_message_port_to_context(ant_t *js, ant_value_t *args, int nargs) { 960 if (nargs < 1 || !wt_is_message_port(js, args[0])) { 961 return js_mkerr(js, "moveMessagePortToContext(port, context) requires a MessagePort"); 962 } 963 return args[0]; 964} 965 966void gc_mark_worker_threads(ant_t *js, gc_mark_fn mark) { 967 for (ant_worker_thread_t *wt = active_workers_head; wt; wt = wt->next) { 968 mark(js, wt->self_val); 969 if (wt->has_terminate_val) mark(js, wt->terminate_val); 970 } 971} 972 973ant_value_t worker_threads_library(ant_t *js) { 974 ant_value_t lib = js_mkobj(js); 975 bool is_worker = wt_is_worker_mode(); 976 977 wt_init_env_store(js, is_worker); 978 979 js_set(js, lib, "isMainThread", js_bool(!is_worker)); 980 js_set(js, lib, "threadId", js_mknum((double)(is_worker ? rt->pid : 0))); 981 js_set(js, lib, "SHARE_ENV", js_mksym(js, "SHARE_ENV")); 982 983 ant_value_t message_port_ctor_obj = js_mkobj(js); 984 ant_value_t message_port_proto = js_mkobj(js); 985 js_set(js, message_port_proto, "postMessage", js_mkfun(worker_threads_message_port_post_message)); 986 js_set(js, message_port_proto, "on", js_mkfun(worker_threads_message_port_on)); 987 js_set(js, message_port_proto, "once", js_mkfun(worker_threads_message_port_once)); 988 js_set(js, message_port_proto, "start", js_mkfun(worker_threads_message_port_start)); 989 js_set(js, message_port_proto, "close", js_mkfun(worker_threads_message_port_close)); 990 js_set(js, message_port_proto, "ref", js_mkfun(worker_threads_message_port_ref)); 991 js_set(js, message_port_proto, "unref", js_mkfun(worker_threads_message_port_unref)); 992 js_set_sym(js, message_port_proto, get_toStringTag_sym(), js_mkstr(js, "MessagePort", 11)); 993 994 js_set_slot(message_port_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_port_ctor)); 995 js_mkprop_fast(js, message_port_ctor_obj, "prototype", 9, message_port_proto); 996 js_mkprop_fast(js, message_port_ctor_obj, "name", 4, js_mkstr(js, "MessagePort", 11)); 997 js_set_descriptor(js, message_port_ctor_obj, "name", 4, 0); 998 js_set(js, lib, "MessagePort", js_obj_to_func(message_port_ctor_obj)); 999 js_set_slot(js->global, SLOT_WT_PORT_PROTO, message_port_proto); 1000 1001 ant_value_t message_channel_ctor_obj = js_mkobj(js); 1002 ant_value_t message_channel_proto = js_mkobj(js); 1003 js_set_sym(js, message_channel_proto, get_toStringTag_sym(), js_mkstr(js, "MessageChannel", 14)); 1004 js_set_slot(message_channel_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_channel_ctor)); 1005 js_mkprop_fast(js, message_channel_ctor_obj, "prototype", 9, message_channel_proto); 1006 js_mkprop_fast(js, message_channel_ctor_obj, "name", 4, js_mkstr(js, "MessageChannel", 14)); 1007 js_set_descriptor(js, message_channel_ctor_obj, "name", 4, 0); 1008 js_set(js, lib, "MessageChannel", js_obj_to_func(message_channel_ctor_obj)); 1009 1010 if (is_worker) { 1011 ant_value_t parent_port = js_mkobj(js); 1012 js_set(js, parent_port, "postMessage", js_mkfun(worker_threads_parent_post_message)); 1013 js_set(js, parent_port, "unref", js_mkfun(worker_threads_parent_unref)); 1014 js_set(js, parent_port, "ref", js_mkfun(worker_threads_parent_unref)); 1015 js_set(js, lib, "parentPort", parent_port); 1016 1017 const char *worker_data_json = getenv(WT_ENV_DATA_JSON); 1018 if (worker_data_json && worker_data_json[0]) { 1019 ant_value_t raw = js_mkstr(js, worker_data_json, strlen(worker_data_json)); 1020 ant_value_t parsed = json_parse_value(js, raw); 1021 js_set(js, lib, "workerData", is_err(parsed) ? js_mkundef() : parsed); 1022 } else js_set(js, lib, "workerData", js_mkundef()); 1023 } else { 1024 js_set(js, lib, "parentPort", js_mknull()); 1025 js_set(js, lib, "workerData", js_mkundef()); 1026 } 1027 1028 ant_value_t worker_ctor_obj = js_mkobj(js); 1029 ant_value_t worker_proto = js_mkobj(js); 1030 1031 js_set(js, worker_proto, "on", js_mkfun(worker_threads_worker_on)); 1032 js_set(js, worker_proto, "once", js_mkfun(worker_threads_worker_once)); 1033 js_set(js, worker_proto, "terminate", js_mkfun(worker_threads_worker_terminate)); 1034 js_set(js, worker_proto, "unref", js_mkfun(worker_threads_worker_unref)); 1035 js_set(js, worker_proto, "ref", js_mkfun(worker_threads_worker_ref)); 1036 js_set(js, worker_proto, "postMessage", js_mkfun(worker_threads_worker_post_message)); 1037 js_set_sym(js, worker_proto, get_toStringTag_sym(), js_mkstr(js, "Worker", 6)); 1038 1039 js_set_slot(worker_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_worker_ctor)); 1040 js_mkprop_fast(js, worker_ctor_obj, "prototype", 9, worker_proto); 1041 js_mkprop_fast(js, worker_ctor_obj, "name", 4, js_mkstr(js, "Worker", 6)); 1042 js_set_descriptor(js, worker_ctor_obj, "name", 4, 0); 1043 js_set(js, lib, "Worker", js_obj_to_func(worker_ctor_obj)); 1044 1045 js_set(js, lib, "markAsUntransferable", js_mkfun(worker_threads_mark_as_untransferable)); 1046 js_set(js, lib, "receiveMessageOnPort", js_mkfun(worker_threads_receive_message_on_port)); 1047 js_set(js, lib, "setEnvironmentData", js_mkfun(worker_threads_set_environment_data)); 1048 js_set(js, lib, "getEnvironmentData", js_mkfun(worker_threads_get_environment_data)); 1049 js_set(js, lib, "moveMessagePortToContext", js_mkfun(worker_threads_move_message_port_to_context)); 1050 1051 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "worker_threads", 14)); 1052 return lib; 1053}