MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1// stub: minimal node:worker_threads implementation
2// just enough for rolldown to run transforms
3
4#include <compat.h> // IWYU pragma: keep
5
6#include <uv.h>
7#include <stdio.h>
8#include <stdlib.h>
9#include <string.h>
10#include <signal.h>
11
12#ifdef _WIN32
13#include <io.h>
14#define WT_WRITE _write
15#define WT_STDOUT_FD 1
16extern char **_environ;
17#define environ _environ
18#else
19#include <unistd.h>
20#define WT_WRITE write
21#define WT_STDOUT_FD STDOUT_FILENO
22extern char **environ;
23#endif
24
25#include "ant.h"
26#include "internal.h"
27#include "runtime.h"
28#include "descriptors.h"
29#include "silver/engine.h"
30#include "modules/json.h"
31#include "modules/symbol.h"
32#include "modules/worker_threads.h"
33#include "gc/modules.h"
34
35#define WT_ENV_MODE "ANT_WORKER_THREADS_MODE"
36#define WT_ENV_DATA_JSON "ANT_WORKER_DATA_JSON"
37#define WT_ENV_STORE_JSON "ANT_WORKER_ENV_DATA_JSON"
38#define WT_MSG_PREFIX "ANT_WT_MSG:"
39
40typedef struct ant_worker_thread {
41 ant_t *js;
42 uv_process_t process;
43 uv_pipe_t stdout_pipe;
44 bool spawned;
45 bool exited;
46 bool closing;
47 bool closed;
48 bool refed;
49 int close_pending;
50 int64_t exit_status;
51 int term_signal;
52 char *line_buf;
53 size_t line_len;
54 size_t line_cap;
55 ant_value_t self_val;
56 ant_value_t terminate_val;
57 bool has_terminate_val;
58 struct ant_worker_thread *next;
59 struct ant_worker_thread *prev;
60} ant_worker_thread_t;
61
62static ant_worker_thread_t *active_workers_head = NULL;
63
64static bool wt_is_worker_mode(void) {
65 const char *mode = getenv(WT_ENV_MODE);
66 return mode && strcmp(mode, "1") == 0;
67}
68
69static ant_value_t wt_get_or_create_env_store(ant_t *js) {
70 ant_value_t store = js_get_slot(js->global, SLOT_WT_ENV_STORE);
71 if (!is_object_type(store)) {
72 store = js_mkobj(js);
73 js_set_slot(js->global, SLOT_WT_ENV_STORE, store);
74 }
75 return store;
76}
77
78static void wt_init_env_store(ant_t *js, bool is_worker) {
79 ant_value_t store = js_mkobj(js);
80 if (is_worker) {
81 const char *raw = getenv(WT_ENV_STORE_JSON);
82 if (raw && raw[0]) {
83 ant_value_t input = js_mkstr(js, raw, strlen(raw));
84 ant_value_t parsed = json_parse_value(js, input);
85 if (is_object_type(parsed)) store = parsed;
86 }
87 }
88 js_set_slot(js->global, SLOT_WT_ENV_STORE, store);
89}
90
91static ant_worker_thread_t *wt_get_worker(ant_t *js, ant_value_t this_obj) {
92 if (!is_object_type(this_obj)) return NULL;
93 ant_value_t data = js_get_slot(this_obj, SLOT_DATA);
94 if (vtype(data) != T_NUM) return NULL;
95 return (ant_worker_thread_t *)(uintptr_t)js_getnum(data);
96}
97
98static bool wt_is_message_port(ant_t *js, ant_value_t obj) {
99 if (!is_object_type(obj)) return false;
100 ant_value_t tag = js_get_slot(obj, SLOT_WT_PORT_TAG);
101 return js_truthy(js, tag);
102}
103
104static ant_value_t wt_get_message_port_proto(ant_t *js) {
105 ant_value_t proto = js_get_slot(js->global, SLOT_WT_PORT_PROTO);
106 if (!is_object_type(proto)) proto = js_mkobj(js);
107 return proto;
108}
109
110static ant_value_t wt_make_message_port(ant_t *js) {
111 ant_value_t port = js_mkobj(js);
112 js_set_proto_init(port, wt_get_message_port_proto(js));
113 js_set_slot(port, SLOT_WT_PORT_TAG, js_true);
114 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js));
115 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
116 js_set_slot(port, SLOT_WT_PORT_PEER, js_mknull());
117 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_false);
118 js_set_slot(port, SLOT_WT_PORT_STARTED, js_false);
119 js_set_slot(port, SLOT_WT_PORT_ON_MESSAGE, js_mkundef());
120 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef());
121 js_set(js, port, "onmessage", js_mkundef());
122 return port;
123}
124
125static bool wt_port_is_closed(ant_t *js, ant_value_t port) {
126 return js_truthy(js, js_get_slot(port, SLOT_WT_PORT_CLOSED));
127}
128
129static void wt_port_set_closed(ant_t *js, ant_value_t port, bool closed) {
130 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_bool(closed));
131}
132
133static bool wt_port_queue_dequeue(ant_t *js, ant_value_t port, ant_value_t *out) {
134 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE);
135 if (vtype(queue) != T_ARR) return false;
136
137 ant_value_t head_val = js_get_slot(port, SLOT_WT_PORT_HEAD);
138 ant_offset_t head = (vtype(head_val) == T_NUM) ? (ant_offset_t)js_getnum(head_val) : 0;
139 ant_offset_t len = js_arr_len(js, queue);
140 if (len <= 0 || head >= len) {
141 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js));
142 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
143 return false;
144 }
145
146 if (out) *out = js_arr_get(js, queue, head);
147 ant_offset_t next_head = head + 1;
148
149 if (next_head >= len) {
150 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js));
151 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
152 return true;
153 }
154
155 if (next_head > 32 && next_head * 2 >= len) {
156 ant_value_t compact = js_mkarr(js);
157 for (ant_offset_t i = next_head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, queue, i));
158 js_set_slot(port, SLOT_WT_PORT_QUEUE, compact);
159 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
160 return true;
161 }
162
163 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum((double)next_head));
164 return true;
165}
166
167static void wt_port_queue_push(ant_t *js, ant_value_t port, ant_value_t value) {
168 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE);
169 if (vtype(queue) != T_ARR) {
170 queue = js_mkarr(js);
171 js_set_slot(port, SLOT_WT_PORT_QUEUE, queue);
172 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
173 }
174 js_arr_push(js, queue, value);
175}
176
177static void wt_port_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) {
178 if (!is_callable(fn)) return;
179 ant_value_t argv[1] = {arg};
180
181 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false);
182}
183
184static bool wt_port_should_deliver(ant_t *js, ant_value_t port) {
185 if (wt_port_is_closed(js, port)) return false;
186 bool started = js_truthy(js, js_get_slot(port, SLOT_WT_PORT_STARTED));
187 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE);
188 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE);
189 ant_value_t onmessage = js_get(js, port, "onmessage");
190
191 bool has_event_listener = is_callable(on_fn) || is_callable(once_fn);
192 if (is_callable(onmessage)) return true;
193 return started && has_event_listener;
194}
195
196static void wt_port_drain(ant_t *js, ant_value_t port) {
197 if (!wt_is_message_port(js, port)) return;
198
199 while (wt_port_should_deliver(js, port)) {
200 ant_value_t msg = js_mkundef();
201 if (!wt_port_queue_dequeue(js, port, &msg)) break;
202
203 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE);
204 wt_port_call_listener(js, port, on_fn, msg);
205
206 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE);
207 if (is_callable(once_fn)) {
208 wt_port_call_listener(js, port, once_fn, msg);
209 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef());
210 }
211
212 ant_value_t onmessage = js_get(js, port, "onmessage");
213 if (is_callable(onmessage)) {
214 ant_value_t event_obj = js_mkobj(js);
215 js_set(js, event_obj, "data", msg);
216 wt_port_call_listener(js, port, onmessage, event_obj);
217 }
218
219 if (wt_port_is_closed(js, port)) break;
220 }
221}
222
223static ant_value_t wt_make_resolved_promise(ant_t *js, ant_value_t value) {
224 ant_value_t p = js_mkpromise(js);
225 js_resolve_promise(js, p, value);
226 return p;
227}
228
229static void wt_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) {
230 if (!is_callable(fn)) return;
231 ant_value_t argv[1] = {arg};
232
233 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false);
234}
235
236static void wt_emit(ant_worker_thread_t *wt, const char *event, ant_value_t arg) {
237 if (!wt || !wt->js) return;
238 ant_t *js = wt->js;
239 ant_value_t this_obj = wt->self_val;
240 if (!is_object_type(this_obj)) return;
241
242 internal_slot_t on_slot, once_slot;
243 if (strcmp(event, "message") == 0) {
244 on_slot = SLOT_WT_ON_MESSAGE;
245 once_slot = SLOT_WT_ONCE_MESSAGE;
246 } else if (strcmp(event, "exit") == 0) {
247 on_slot = SLOT_WT_ON_EXIT;
248 once_slot = SLOT_WT_ONCE_EXIT;
249 } else return;
250
251 ant_value_t on_fn = js_get_slot(this_obj, on_slot);
252 wt_call_listener(js, this_obj, on_fn, arg);
253
254 ant_value_t once_fn = js_get_slot(this_obj, once_slot);
255 if (is_callable(once_fn)) {
256 wt_call_listener(js, this_obj, once_fn, arg);
257 js_set_slot(this_obj, once_slot, js_mkundef());
258 }
259}
260
261static void wt_free_env(char **env) {
262 if (!env) return;
263 for (char **p = env; *p; p++) free(*p);
264 free(env);
265}
266
267static char **wt_build_worker_env(const char *worker_data_json, const char *env_store_json) {
268 size_t count = 0;
269 if (environ) {
270 while (environ[count]) count++;
271 }
272
273 size_t extra = 2;
274 if (worker_data_json) extra++;
275 if (env_store_json) extra++;
276 char **env = (char **)calloc(count + extra, sizeof(char *));
277 if (!env) return NULL;
278
279 size_t out = 0;
280 for (size_t i = 0; i < count; i++) {
281 env[out] = strdup(environ[i]);
282 if (!env[out]) {
283 wt_free_env(env);
284 return NULL;
285 }
286 out++;
287 }
288
289 env[out++] = strdup(WT_ENV_MODE "=1");
290 if (!env[out - 1]) {
291 wt_free_env(env);
292 return NULL;
293 }
294
295 if (worker_data_json) {
296 size_t key_len = strlen(WT_ENV_DATA_JSON);
297 size_t val_len = strlen(worker_data_json);
298 char *entry = (char *)malloc(key_len + 1 + val_len + 1);
299 if (!entry) {
300 wt_free_env(env);
301 return NULL;
302 }
303 memcpy(entry, WT_ENV_DATA_JSON, key_len);
304 entry[key_len] = '=';
305 memcpy(entry + key_len + 1, worker_data_json, val_len);
306 entry[key_len + 1 + val_len] = '\0';
307 env[out++] = entry;
308 }
309
310 if (env_store_json) {
311 size_t key_len = strlen(WT_ENV_STORE_JSON);
312 size_t val_len = strlen(env_store_json);
313 char *entry = (char *)malloc(key_len + 1 + val_len + 1);
314 if (!entry) {
315 wt_free_env(env);
316 return NULL;
317 }
318 memcpy(entry, WT_ENV_STORE_JSON, key_len);
319 entry[key_len] = '=';
320 memcpy(entry + key_len + 1, env_store_json, val_len);
321 entry[key_len + 1 + val_len] = '\0';
322 env[out++] = entry;
323 }
324
325 env[out] = NULL;
326 return env;
327}
328
329static void wt_cleanup(ant_worker_thread_t *wt) {
330 if (!wt) return;
331 free(wt->line_buf);
332 free(wt);
333}
334
335static void wt_detach(ant_worker_thread_t *wt) {
336 if (!wt) return;
337
338 if (wt->prev) wt->prev->next = wt->next;
339 else active_workers_head = wt->next;
340 if (wt->next) wt->next->prev = wt->prev;
341 wt->prev = NULL;
342 wt->next = NULL;
343
344 wt->self_val = js_mkundef();
345 wt->terminate_val = js_mkundef();
346 wt->has_terminate_val = false;
347 free(wt->line_buf);
348 wt->line_buf = NULL;
349 wt->line_len = 0;
350 wt->line_cap = 0;
351 wt->spawned = false;
352 wt->refed = false;
353 wt->closed = true;
354 wt->js = NULL;
355}
356
357static void wt_on_handle_closed(uv_handle_t *h) {
358 ant_worker_thread_t *wt = (ant_worker_thread_t *)h->data;
359 if (!wt) return;
360 if (wt->close_pending > 0) wt->close_pending--;
361 if (wt->close_pending == 0) wt_detach(wt);
362}
363
364static void wt_finish_exit(ant_worker_thread_t *wt) {
365 if (!wt) return;
366 if (wt->closed || wt->closing) return;
367 wt->closing = true;
368 wt->spawned = false;
369 wt->refed = false;
370 wt->close_pending = 0;
371
372 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) {
373 wt->close_pending++;
374 uv_close((uv_handle_t *)&wt->stdout_pipe, wt_on_handle_closed);
375 }
376 if (!uv_is_closing((uv_handle_t *)&wt->process)) {
377 wt->close_pending++;
378 uv_close((uv_handle_t *)&wt->process, wt_on_handle_closed);
379 }
380
381 if (wt->close_pending == 0) wt_detach(wt);
382}
383
384static void wt_on_process_exit(uv_process_t *proc, int64_t exit_status, int term_signal) {
385 ant_worker_thread_t *wt = (ant_worker_thread_t *)proc->data;
386 if (!wt || !wt->js) return;
387
388 wt->exited = true;
389 wt->exit_status = exit_status;
390 wt->term_signal = term_signal;
391
392 if (wt->has_terminate_val) {
393 ant_value_t p = wt->terminate_val;
394 js_resolve_promise(wt->js, p, js_mknum((double)exit_status));
395 wt->terminate_val = js_mkundef();
396 wt->has_terminate_val = false;
397 }
398
399 wt_emit(wt, "exit", js_mknum((double)exit_status));
400 wt_finish_exit(wt);
401}
402
403static void wt_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
404 buf->base = (char *)malloc(suggested_size);
405 buf->len = buf->base ? suggested_size : 0;
406}
407
408static void wt_emit_message_from_json(ant_worker_thread_t *wt, const char *json, size_t len) {
409 if (!wt || !wt->js) return;
410 ant_t *js = wt->js;
411
412 ant_value_t s = js_mkstr(js, json, len);
413 ant_value_t msg = json_parse_value(js, s);
414 if (is_err(msg)) msg = s;
415
416 wt_emit(wt, "message", msg);
417}
418
419static void wt_process_lines(ant_worker_thread_t *wt) {
420 if (!wt || !wt->line_buf) return;
421
422 for (;;) {
423 char *nl = memchr(wt->line_buf, '\n', wt->line_len);
424 if (!nl) break;
425
426 size_t line_len = (size_t)(nl - wt->line_buf);
427 if (line_len > 0 && wt->line_buf[line_len - 1] == '\r') line_len--;
428
429 size_t prefix_len = strlen(WT_MSG_PREFIX);
430 if (line_len >= prefix_len && memcmp(wt->line_buf, WT_MSG_PREFIX, prefix_len) == 0) {
431 const char *payload = wt->line_buf + prefix_len;
432 size_t payload_len = line_len - prefix_len;
433 wt_emit_message_from_json(wt, payload, payload_len);
434 }
435
436 size_t consumed = (size_t)(nl - wt->line_buf) + 1;
437 size_t remain = wt->line_len - consumed;
438 memmove(wt->line_buf, wt->line_buf + consumed, remain);
439 wt->line_len = remain;
440 }
441}
442
443static bool wt_append_stdout(ant_worker_thread_t *wt, const char *data, size_t len) {
444 if (!wt || !data || len == 0) return true;
445
446 size_t needed = wt->line_len + len;
447 if (needed + 1 > wt->line_cap) {
448 size_t cap = wt->line_cap ? wt->line_cap : 1024;
449 while (cap < needed + 1) cap *= 2;
450 char *next = (char *)realloc(wt->line_buf, cap);
451 if (!next) return false;
452 wt->line_buf = next;
453 wt->line_cap = cap;
454 }
455
456 memcpy(wt->line_buf + wt->line_len, data, len);
457 wt->line_len += len;
458 wt->line_buf[wt->line_len] = '\0';
459 return true;
460}
461
462static void wt_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
463 ant_worker_thread_t *wt = (ant_worker_thread_t *)stream->data;
464 if (!wt) {
465 free(buf->base);
466 return;
467 }
468
469 if (nread > 0) {
470 if (wt_append_stdout(wt, buf->base, (size_t)nread)) wt_process_lines(wt);
471 } else if (nread < 0) {
472 uv_read_stop(stream);
473 }
474
475 free(buf->base);
476}
477
478static char *wt_path_from_specifier(ant_t *js, ant_value_t spec) {
479 const char *raw = NULL;
480 size_t len = 0;
481
482 if (vtype(spec) == T_STR) {
483 raw = js_getstr(js, spec, &len);
484 } else if (is_object_type(spec)) {
485 ant_value_t pathname = js_get(js, spec, "pathname");
486 if (vtype(pathname) == T_STR) raw = js_getstr(js, pathname, &len);
487 if (!raw) {
488 ant_value_t href = js_get(js, spec, "href");
489 if (vtype(href) == T_STR) raw = js_getstr(js, href, &len);
490 }
491 }
492
493 if (!raw || len == 0) return NULL;
494
495 if (len >= 7 && strncmp(raw, "file://", 7) == 0) {
496 const char *p = raw + 7;
497 if (strncmp(p, "localhost/", 10) == 0) p += 9;
498 if (*p == '\0') return NULL;
499 return strndup(p, len - (size_t)(p - raw));
500 }
501
502 return strndup(raw, len);
503}
504
505static int wt_spawn_worker(
506 ant_worker_thread_t *wt,
507 const char *script_path,
508 const char *worker_data_json,
509 const char *env_store_json
510) {
511 if (!wt || !wt->js || !script_path || !rt || !rt->argv || rt->argc <= 0) return UV_EINVAL;
512
513 uv_loop_t *loop = uv_default_loop();
514 uv_pipe_init(loop, &wt->stdout_pipe, 0);
515
516 uv_stdio_container_t stdio[3];
517 stdio[0].flags = UV_IGNORE;
518 stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
519 stdio[1].data.stream = (uv_stream_t *)&wt->stdout_pipe;
520 stdio[2].flags = UV_INHERIT_FD;
521 stdio[2].data.fd = 2;
522
523 char *argv0 = strdup(rt->argv[0]);
524 char *argv1 = strdup(script_path);
525 if (!argv0 || !argv1) {
526 free(argv0);
527 free(argv1);
528 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL);
529 return UV_ENOMEM;
530 }
531
532 char *args[3] = {argv0, argv1, NULL};
533 char **env = wt_build_worker_env(worker_data_json, env_store_json);
534 if (!env) {
535 free(argv0);
536 free(argv1);
537 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL);
538 return UV_ENOMEM;
539 }
540
541 uv_process_options_t options;
542 memset(&options, 0, sizeof(options));
543 options.file = argv0;
544 options.args = args;
545 options.env = env;
546 options.stdio_count = 3;
547 options.stdio = stdio;
548 options.exit_cb = wt_on_process_exit;
549
550 wt->process.data = wt;
551 wt->stdout_pipe.data = wt;
552
553 int rc = uv_spawn(loop, &wt->process, &options);
554
555 wt_free_env(env);
556 free(argv0);
557 free(argv1);
558
559 if (rc != 0) {
560 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL);
561 return rc;
562 }
563
564 wt->spawned = true;
565 wt->refed = true;
566
567 rc = uv_read_start((uv_stream_t *)&wt->stdout_pipe, wt_alloc_cb, wt_read_cb);
568 if (rc != 0) {
569 uv_process_kill(&wt->process, SIGTERM);
570 }
571 return rc;
572}
573
574static ant_value_t worker_threads_worker_on(ant_t *js, ant_value_t *args, int nargs) {
575 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
576 return js_mkerr(js, "Worker.on(event, listener) requires (string, function)");
577 }
578
579 ant_value_t this_obj = js_getthis(js);
580 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver");
581
582 size_t len = 0;
583 const char *event = js_getstr(js, args[0], &len);
584 if (!event) return js_mkerr(js, "invalid event name");
585
586 if (len == 7 && memcmp(event, "message", 7) == 0) {
587 js_set_slot(this_obj, SLOT_WT_ON_MESSAGE, args[1]);
588 } else if (len == 4 && memcmp(event, "exit", 4) == 0) {
589 js_set_slot(this_obj, SLOT_WT_ON_EXIT, args[1]);
590 }
591
592 return this_obj;
593}
594
595static ant_value_t worker_threads_worker_once(ant_t *js, ant_value_t *args, int nargs) {
596 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
597 return js_mkerr(js, "Worker.once(event, listener) requires (string, function)");
598 }
599
600 ant_value_t this_obj = js_getthis(js);
601 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver");
602
603 size_t len = 0;
604 const char *event = js_getstr(js, args[0], &len);
605 if (!event) return js_mkerr(js, "invalid event name");
606
607 if (len == 7 && memcmp(event, "message", 7) == 0) {
608 js_set_slot(this_obj, SLOT_WT_ONCE_MESSAGE, args[1]);
609 } else if (len == 4 && memcmp(event, "exit", 4) == 0) {
610 js_set_slot(this_obj, SLOT_WT_ONCE_EXIT, args[1]);
611 }
612
613 return this_obj;
614}
615
616static ant_value_t worker_threads_worker_unref(ant_t *js, ant_value_t *args, int nargs) {
617 ant_value_t this_obj = js_getthis(js);
618 ant_worker_thread_t *wt = wt_get_worker(js, this_obj);
619 if (!wt) return js_mkerr(js, "invalid Worker receiver");
620 if (wt->spawned && wt->refed) {
621 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_unref((uv_handle_t *)&wt->process);
622 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_unref((uv_handle_t *)&wt->stdout_pipe);
623 wt->refed = false;
624 }
625 return this_obj;
626}
627
628static ant_value_t worker_threads_worker_ref(ant_t *js, ant_value_t *args, int nargs) {
629 ant_value_t this_obj = js_getthis(js);
630 ant_worker_thread_t *wt = wt_get_worker(js, this_obj);
631 if (!wt) return js_mkerr(js, "invalid Worker receiver");
632 if (wt->spawned && !wt->refed) {
633 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_ref((uv_handle_t *)&wt->process);
634 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_ref((uv_handle_t *)&wt->stdout_pipe);
635 wt->refed = true;
636 }
637 return this_obj;
638}
639
640static ant_value_t worker_threads_worker_terminate(ant_t *js, ant_value_t *args, int nargs) {
641 ant_value_t this_obj = js_getthis(js);
642 ant_worker_thread_t *wt = wt_get_worker(js, this_obj);
643 if (!wt) return js_mkerr(js, "invalid Worker receiver");
644
645 if (wt->exited) return wt_make_resolved_promise(js, js_mknum((double)wt->exit_status));
646
647 if (!wt->has_terminate_val) {
648 ant_value_t p = js_mkpromise(js);
649 wt->terminate_val = p;
650 wt->has_terminate_val = true;
651 }
652
653 int rc = uv_process_kill(&wt->process, SIGTERM);
654 if (rc != 0) {
655 ant_value_t p = wt->terminate_val;
656 js_reject_promise(js, p, js_mkerr(js, "terminate failed: %s", uv_strerror(rc)));
657 wt->terminate_val = js_mkundef();
658 wt->has_terminate_val = false;
659 return p;
660 }
661
662 return wt->terminate_val;
663}
664
665static ant_value_t worker_threads_worker_post_message(ant_t *js, ant_value_t *args, int nargs) {
666 return js_mkerr(js, "Worker.postMessage is not implemented yet");
667}
668
669static ant_value_t worker_threads_message_port_post_message(ant_t *js, ant_value_t *args, int nargs) {
670 ant_value_t this_obj = js_getthis(js);
671 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
672 if (wt_port_is_closed(js, this_obj)) return js_mkundef();
673
674 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER);
675 if (!wt_is_message_port(js, peer) || wt_port_is_closed(js, peer)) return js_mkundef();
676
677 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef();
678 wt_port_queue_push(js, peer, value);
679 wt_port_drain(js, peer);
680 return js_mkundef();
681}
682
683static ant_value_t worker_threads_message_port_on(ant_t *js, ant_value_t *args, int nargs) {
684 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
685 return js_mkerr(js, "MessagePort.on(event, listener) requires (string, function)");
686 }
687 ant_value_t this_obj = js_getthis(js);
688 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
689
690 size_t len = 0;
691 const char *event = js_getstr(js, args[0], &len);
692 if (!event) return js_mkerr(js, "invalid event name");
693 if (len == 7 && memcmp(event, "message", 7) == 0) {
694 js_set_slot(this_obj, SLOT_WT_PORT_ON_MESSAGE, args[1]);
695 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true);
696 wt_port_drain(js, this_obj);
697 }
698 return this_obj;
699}
700
701static ant_value_t worker_threads_message_port_once(ant_t *js, ant_value_t *args, int nargs) {
702 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
703 return js_mkerr(js, "MessagePort.once(event, listener) requires (string, function)");
704 }
705 ant_value_t this_obj = js_getthis(js);
706 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
707
708 size_t len = 0;
709 const char *event = js_getstr(js, args[0], &len);
710 if (!event) return js_mkerr(js, "invalid event name");
711 if (len == 7 && memcmp(event, "message", 7) == 0) {
712 js_set_slot(this_obj, SLOT_WT_PORT_ONCE_MESSAGE, args[1]);
713 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true);
714 wt_port_drain(js, this_obj);
715 }
716 return this_obj;
717}
718
719static ant_value_t worker_threads_message_port_start(ant_t *js, ant_value_t *args, int nargs) {
720 ant_value_t this_obj = js_getthis(js);
721 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
722 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true);
723 wt_port_drain(js, this_obj);
724 return js_mkundef();
725}
726
727static ant_value_t worker_threads_message_port_close(ant_t *js, ant_value_t *args, int nargs) {
728 ant_value_t this_obj = js_getthis(js);
729 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
730 wt_port_set_closed(js, this_obj, true);
731
732 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER);
733 js_set_slot(this_obj, SLOT_WT_PORT_PEER, js_mknull());
734 if (wt_is_message_port(js, peer)) js_set_slot(peer, SLOT_WT_PORT_PEER, js_mknull());
735 return js_mkundef();
736}
737
738static ant_value_t worker_threads_message_port_ref(ant_t *js, ant_value_t *args, int nargs) {
739 return js_mkundef();
740}
741
742static ant_value_t worker_threads_message_port_unref(ant_t *js, ant_value_t *args, int nargs) {
743 return js_mkundef();
744}
745
746static ant_value_t worker_threads_message_port_ctor(ant_t *js, ant_value_t *args, int nargs) {
747 return js_mkerr(js, "MessagePort constructor is not public");
748}
749
750static ant_value_t worker_threads_message_channel_ctor(ant_t *js, ant_value_t *args, int nargs) {
751 if (vtype(js->new_target) == T_UNDEF) {
752 return js_mkerr(js, "MessageChannel constructor requires 'new'");
753 }
754
755 ant_value_t this_obj = js_getthis(js);
756 ant_value_t port1 = wt_make_message_port(js);
757 ant_value_t port2 = wt_make_message_port(js);
758 js_set_slot(port1, SLOT_WT_PORT_PEER, port2);
759 js_set_slot(port2, SLOT_WT_PORT_PEER, port1);
760 js_set(js, this_obj, "port1", port1);
761 js_set(js, this_obj, "port2", port2);
762 return this_obj;
763}
764
765static ant_value_t worker_threads_worker_ctor(ant_t *js, ant_value_t *args, int nargs) {
766 ant_value_t this_obj = js_getthis(js);
767 ant_value_t proto = js_instance_proto_from_new_target(js, js_mkundef());
768
769 if (vtype(js->new_target) == T_UNDEF) {
770 return js_mkerr(js, "Worker constructor requires 'new'");
771 }
772 if (nargs < 1) return js_mkerr(js, "Worker() requires a filename or URL");
773 if (is_object_type(this_obj) && is_object_type(proto)) js_set_proto_init(this_obj, proto);
774
775 char *script_path = wt_path_from_specifier(js, args[0]);
776 if (!script_path) return js_mkerr(js, "Invalid Worker filename/URL");
777
778 const char *worker_data_json = NULL;
779 const char *env_store_json = NULL;
780 char *worker_data_heap = NULL;
781 char *env_store_heap = NULL;
782 if (nargs >= 2 && is_object_type(args[1])) {
783 ant_value_t worker_data = js_get(js, args[1], "workerData");
784 if (!is_undefined(worker_data)) {
785 ant_value_t stringify_args[1] = {worker_data};
786 ant_value_t json = js_json_stringify(js, stringify_args, 1);
787 if (vtype(json) != T_STR) {
788 free(script_path);
789 return js_mkerr(js, "Worker options.workerData must be JSON-serializable");
790 }
791 size_t len = 0;
792 const char *raw = js_getstr(js, json, &len);
793 if (!raw) {
794 free(script_path);
795 return js_mkerr(js, "Failed to stringify workerData");
796 }
797 worker_data_heap = strndup(raw, len);
798 if (!worker_data_heap) {
799 free(script_path);
800 return js_mkerr(js, "Out of memory");
801 }
802 worker_data_json = worker_data_heap;
803 }
804 }
805
806 ant_value_t env_store = wt_get_or_create_env_store(js);
807 ant_value_t env_stringify_args[1] = {env_store};
808 ant_value_t env_json = js_json_stringify(js, env_stringify_args, 1);
809 if (vtype(env_json) != T_STR) {
810 free(script_path);
811 free(worker_data_heap);
812 return js_mkerr(js, "setEnvironmentData values must be JSON-serializable");
813 }
814 size_t env_len = 0;
815 const char *env_raw = js_getstr(js, env_json, &env_len);
816 if (!env_raw) {
817 free(script_path);
818 free(worker_data_heap);
819 return js_mkerr(js, "Failed to snapshot environment data");
820 }
821 env_store_heap = strndup(env_raw, env_len);
822 if (!env_store_heap) {
823 free(script_path);
824 free(worker_data_heap);
825 return js_mkerr(js, "Out of memory");
826 }
827 env_store_json = env_store_heap;
828
829 ant_worker_thread_t *wt = (ant_worker_thread_t *)calloc(1, sizeof(*wt));
830 if (!wt) {
831 free(script_path);
832 free(worker_data_heap);
833 free(env_store_heap);
834 return js_mkerr(js, "Out of memory");
835 }
836
837 wt->js = js;
838 wt->self_val = this_obj;
839 js_set_slot(this_obj, SLOT_DATA, ANT_PTR(wt));
840
841 wt->prev = NULL;
842 wt->next = active_workers_head;
843 if (active_workers_head) active_workers_head->prev = wt;
844 active_workers_head = wt;
845
846 int rc = wt_spawn_worker(wt, script_path, worker_data_json, env_store_json);
847 free(script_path);
848 free(worker_data_heap);
849 free(env_store_heap);
850
851 if (rc != 0) {
852 js_set_slot(this_obj, SLOT_DATA, js_mkundef());
853 wt_cleanup(wt);
854 return js_mkerr(js, "Failed to spawn Worker: %s", uv_strerror(rc));
855 }
856
857 return this_obj;
858}
859
860static ant_value_t worker_threads_parent_post_message(ant_t *js, ant_value_t *args, int nargs) {
861 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef();
862 ant_value_t stringify_args[1] = {value};
863 ant_value_t json = js_json_stringify(js, stringify_args, 1);
864 if (vtype(json) != T_STR) return js_mkerr(js, "parentPort.postMessage payload must be JSON-serializable");
865
866 size_t json_len = 0;
867 const char *json_str = js_getstr(js, json, &json_len);
868 if (!json_str) return js_mkerr(js, "Failed to serialize message");
869
870 size_t prefix_len = strlen(WT_MSG_PREFIX);
871 size_t line_len = prefix_len + json_len + 1;
872 char *line = (char *)malloc(line_len + 1);
873 if (!line) return js_mkerr(js, "Out of memory");
874
875 memcpy(line, WT_MSG_PREFIX, prefix_len);
876 memcpy(line + prefix_len, json_str, json_len);
877 line[prefix_len + json_len] = '\n';
878 line[prefix_len + json_len + 1] = '\0';
879
880 ssize_t wrote = WT_WRITE(WT_STDOUT_FD, line, (unsigned int)line_len);
881 free(line);
882 if (wrote < 0) return js_mkerr(js, "parentPort.postMessage failed");
883 return js_mkundef();
884}
885
886static ant_value_t worker_threads_parent_unref(ant_t *js, ant_value_t *args, int nargs) {
887 return js_mkundef();
888}
889
890static ant_value_t worker_threads_mark_as_untransferable(ant_t *js, ant_value_t *args, int nargs) {
891 if (nargs < 1) return js_mkundef();
892 return args[0];
893}
894
895static ant_value_t worker_threads_receive_message_on_port(ant_t *js, ant_value_t *args, int nargs) {
896 if (nargs < 1 || !wt_is_message_port(js, args[0])) {
897 return js_mkerr(js, "receiveMessageOnPort(port) requires a MessagePort");
898 }
899 ant_value_t msg = js_mkundef();
900 if (!wt_port_queue_dequeue(js, args[0], &msg)) return js_mkundef();
901
902 ant_value_t out = js_mkobj(js);
903 js_set(js, out, "message", msg);
904 return out;
905}
906
907static ant_value_t worker_threads_set_environment_data(ant_t *js, ant_value_t *args, int nargs) {
908 if (nargs < 2) return js_mkerr(js, "setEnvironmentData(key, value) requires 2 arguments");
909
910 ant_value_t key_stringify_args[1] = {args[0]};
911 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1);
912 if (vtype(key_json) != T_STR) return js_mkerr(js, "setEnvironmentData key must be JSON-serializable");
913
914 ant_value_t value_stringify_args[1] = {args[1]};
915 ant_value_t value_json = js_json_stringify(js, value_stringify_args, 1);
916 if (vtype(value_json) != T_STR) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable");
917
918 ant_value_t cloned = json_parse_value(js, value_json);
919 if (is_err(cloned)) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable");
920
921 size_t key_len = 0;
922 const char *key_ptr = js_getstr(js, key_json, &key_len);
923 if (!key_ptr) return js_mkerr(js, "Failed to serialize environment data key");
924
925 char *key = strndup(key_ptr, key_len);
926 if (!key) return js_mkerr(js, "Out of memory");
927 js_set(js, wt_get_or_create_env_store(js), key, cloned);
928 free(key);
929 return js_mkundef();
930}
931
932static ant_value_t worker_threads_get_environment_data(ant_t *js, ant_value_t *args, int nargs) {
933 if (nargs < 1) return js_mkundef();
934
935 ant_value_t key_stringify_args[1] = {args[0]};
936 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1);
937 if (vtype(key_json) != T_STR) return js_mkundef();
938
939 size_t key_len = 0;
940 const char *key_ptr = js_getstr(js, key_json, &key_len);
941 if (!key_ptr) return js_mkundef();
942
943 char *key = strndup(key_ptr, key_len);
944 if (!key) return js_mkerr(js, "Out of memory");
945
946 ant_value_t store = wt_get_or_create_env_store(js);
947 ant_offset_t off = lkp(js, store, key, key_len);
948 if (off == 0) {
949 free(key);
950 return js_mkundef();
951 }
952
953 ant_value_t value = js_get(js, store, key);
954 free(key);
955 return value;
956}
957
958static ant_value_t worker_threads_move_message_port_to_context(ant_t *js, ant_value_t *args, int nargs) {
959 if (nargs < 1 || !wt_is_message_port(js, args[0])) {
960 return js_mkerr(js, "moveMessagePortToContext(port, context) requires a MessagePort");
961 }
962 return args[0];
963}
964
965void gc_mark_worker_threads(ant_t *js, gc_mark_fn mark) {
966 for (ant_worker_thread_t *wt = active_workers_head; wt; wt = wt->next) {
967 mark(js, wt->self_val);
968 if (wt->has_terminate_val) mark(js, wt->terminate_val);
969 }
970}
971
972ant_value_t worker_threads_library(ant_t *js) {
973 ant_value_t lib = js_mkobj(js);
974 bool is_worker = wt_is_worker_mode();
975
976 wt_init_env_store(js, is_worker);
977
978 js_set(js, lib, "isMainThread", js_bool(!is_worker));
979 js_set(js, lib, "threadId", js_mknum((double)(is_worker ? rt->pid : 0)));
980 js_set(js, lib, "SHARE_ENV", js_mksym(js, "SHARE_ENV"));
981
982 ant_value_t message_port_ctor_obj = js_mkobj(js);
983 ant_value_t message_port_proto = js_mkobj(js);
984 js_set(js, message_port_proto, "postMessage", js_mkfun(worker_threads_message_port_post_message));
985 js_set(js, message_port_proto, "on", js_mkfun(worker_threads_message_port_on));
986 js_set(js, message_port_proto, "once", js_mkfun(worker_threads_message_port_once));
987 js_set(js, message_port_proto, "start", js_mkfun(worker_threads_message_port_start));
988 js_set(js, message_port_proto, "close", js_mkfun(worker_threads_message_port_close));
989 js_set(js, message_port_proto, "ref", js_mkfun(worker_threads_message_port_ref));
990 js_set(js, message_port_proto, "unref", js_mkfun(worker_threads_message_port_unref));
991 js_set_sym(js, message_port_proto, get_toStringTag_sym(), js_mkstr(js, "MessagePort", 11));
992
993 js_set_slot(message_port_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_port_ctor));
994 js_mkprop_fast(js, message_port_ctor_obj, "prototype", 9, message_port_proto);
995 js_mkprop_fast(js, message_port_ctor_obj, "name", 4, js_mkstr(js, "MessagePort", 11));
996 js_set_descriptor(js, message_port_ctor_obj, "name", 4, 0);
997 js_set(js, lib, "MessagePort", js_obj_to_func(message_port_ctor_obj));
998 js_set_slot(js->global, SLOT_WT_PORT_PROTO, message_port_proto);
999
1000 ant_value_t message_channel_ctor_obj = js_mkobj(js);
1001 ant_value_t message_channel_proto = js_mkobj(js);
1002 js_set_sym(js, message_channel_proto, get_toStringTag_sym(), js_mkstr(js, "MessageChannel", 14));
1003 js_set_slot(message_channel_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_channel_ctor));
1004 js_mkprop_fast(js, message_channel_ctor_obj, "prototype", 9, message_channel_proto);
1005 js_mkprop_fast(js, message_channel_ctor_obj, "name", 4, js_mkstr(js, "MessageChannel", 14));
1006 js_set_descriptor(js, message_channel_ctor_obj, "name", 4, 0);
1007 js_set(js, lib, "MessageChannel", js_obj_to_func(message_channel_ctor_obj));
1008
1009 if (is_worker) {
1010 ant_value_t parent_port = js_mkobj(js);
1011 js_set(js, parent_port, "postMessage", js_mkfun(worker_threads_parent_post_message));
1012 js_set(js, parent_port, "unref", js_mkfun(worker_threads_parent_unref));
1013 js_set(js, parent_port, "ref", js_mkfun(worker_threads_parent_unref));
1014 js_set(js, lib, "parentPort", parent_port);
1015
1016 const char *worker_data_json = getenv(WT_ENV_DATA_JSON);
1017 if (worker_data_json && worker_data_json[0]) {
1018 ant_value_t raw = js_mkstr(js, worker_data_json, strlen(worker_data_json));
1019 ant_value_t parsed = json_parse_value(js, raw);
1020 js_set(js, lib, "workerData", is_err(parsed) ? js_mkundef() : parsed);
1021 } else js_set(js, lib, "workerData", js_mkundef());
1022 } else {
1023 js_set(js, lib, "parentPort", js_mknull());
1024 js_set(js, lib, "workerData", js_mkundef());
1025 }
1026
1027 ant_value_t worker_ctor_obj = js_mkobj(js);
1028 ant_value_t worker_proto = js_mkobj(js);
1029
1030 js_set(js, worker_proto, "on", js_mkfun(worker_threads_worker_on));
1031 js_set(js, worker_proto, "once", js_mkfun(worker_threads_worker_once));
1032 js_set(js, worker_proto, "terminate", js_mkfun(worker_threads_worker_terminate));
1033 js_set(js, worker_proto, "unref", js_mkfun(worker_threads_worker_unref));
1034 js_set(js, worker_proto, "ref", js_mkfun(worker_threads_worker_ref));
1035 js_set(js, worker_proto, "postMessage", js_mkfun(worker_threads_worker_post_message));
1036 js_set_sym(js, worker_proto, get_toStringTag_sym(), js_mkstr(js, "Worker", 6));
1037
1038 js_set_slot(worker_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_worker_ctor));
1039 js_mkprop_fast(js, worker_ctor_obj, "prototype", 9, worker_proto);
1040 js_mkprop_fast(js, worker_ctor_obj, "name", 4, js_mkstr(js, "Worker", 6));
1041 js_set_descriptor(js, worker_ctor_obj, "name", 4, 0);
1042 js_set(js, lib, "Worker", js_obj_to_func(worker_ctor_obj));
1043
1044 js_set(js, lib, "markAsUntransferable", js_mkfun(worker_threads_mark_as_untransferable));
1045 js_set(js, lib, "receiveMessageOnPort", js_mkfun(worker_threads_receive_message_on_port));
1046 js_set(js, lib, "setEnvironmentData", js_mkfun(worker_threads_set_environment_data));
1047 js_set(js, lib, "getEnvironmentData", js_mkfun(worker_threads_get_environment_data));
1048 js_set(js, lib, "moveMessagePortToContext", js_mkfun(worker_threads_move_message_port_to_context));
1049
1050 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "worker_threads", 14));
1051 return lib;
1052}