MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1// stub: minimal node:worker_threads implementation
2// just enough for rolldown to run transforms
3
4#include <compat.h> // IWYU pragma: keep
5
6#include <uv.h>
7#include <stdio.h>
8#include <stdlib.h>
9#include <string.h>
10#include <signal.h>
11
12#ifdef _WIN32
13#include <io.h>
14#define WT_WRITE _write
15#define WT_STDOUT_FD 1
16extern char **_environ;
17#define environ _environ
18#else
19#include <unistd.h>
20#define WT_WRITE write
21#define WT_STDOUT_FD STDOUT_FILENO
22extern char **environ;
23#endif
24
25#include "ant.h"
26#include "ptr.h"
27#include "internal.h"
28#include "runtime.h"
29#include "descriptors.h"
30#include "silver/engine.h"
31#include "modules/json.h"
32#include "modules/symbol.h"
33#include "modules/worker_threads.h"
34#include "gc/modules.h"
35
36#define WT_ENV_MODE "ANT_WORKER_THREADS_MODE"
37#define WT_ENV_DATA_JSON "ANT_WORKER_DATA_JSON"
38#define WT_ENV_STORE_JSON "ANT_WORKER_ENV_DATA_JSON"
39#define WT_MSG_PREFIX "ANT_WT_MSG:"
40
41typedef struct ant_worker_thread {
42 ant_t *js;
43 uv_process_t process;
44 uv_pipe_t stdout_pipe;
45 bool spawned;
46 bool exited;
47 bool closing;
48 bool closed;
49 bool refed;
50 int close_pending;
51 int64_t exit_status;
52 int term_signal;
53 char *line_buf;
54 size_t line_len;
55 size_t line_cap;
56 ant_value_t self_val;
57 ant_value_t terminate_val;
58 bool has_terminate_val;
59 struct ant_worker_thread *next;
60 struct ant_worker_thread *prev;
61} ant_worker_thread_t;
62
63static ant_worker_thread_t *active_workers_head = NULL;
64
65enum { WORKER_NATIVE_TAG = 0x57524b52u }; // WRKR
66
67static bool wt_is_worker_mode(void) {
68 const char *mode = getenv(WT_ENV_MODE);
69 return mode && strcmp(mode, "1") == 0;
70}
71
72static ant_value_t wt_get_or_create_env_store(ant_t *js) {
73 ant_value_t store = js_get_slot(js->global, SLOT_WT_ENV_STORE);
74 if (!is_object_type(store)) {
75 store = js_mkobj(js);
76 js_set_slot(js->global, SLOT_WT_ENV_STORE, store);
77 }
78 return store;
79}
80
81static void wt_init_env_store(ant_t *js, bool is_worker) {
82 ant_value_t store = js_mkobj(js);
83 if (is_worker) {
84 const char *raw = getenv(WT_ENV_STORE_JSON);
85 if (raw && raw[0]) {
86 ant_value_t input = js_mkstr(js, raw, strlen(raw));
87 ant_value_t parsed = json_parse_value(js, input);
88 if (is_object_type(parsed)) store = parsed;
89 }
90 }
91 js_set_slot(js->global, SLOT_WT_ENV_STORE, store);
92}
93
94static ant_worker_thread_t *wt_get_worker(ant_t *js, ant_value_t this_obj) {
95 if (!is_object_type(this_obj)) return NULL;
96 return (ant_worker_thread_t *)js_get_native(this_obj, WORKER_NATIVE_TAG);
97}
98
99static bool wt_is_message_port(ant_t *js, ant_value_t obj) {
100 if (!is_object_type(obj)) return false;
101 ant_value_t tag = js_get_slot(obj, SLOT_WT_PORT_TAG);
102 return js_truthy(js, tag);
103}
104
105static ant_value_t wt_get_message_port_proto(ant_t *js) {
106 ant_value_t proto = js_get_slot(js->global, SLOT_WT_PORT_PROTO);
107 if (!is_object_type(proto)) proto = js_mkobj(js);
108 return proto;
109}
110
111static ant_value_t wt_make_message_port(ant_t *js) {
112 ant_value_t port = js_mkobj(js);
113 js_set_proto_init(port, wt_get_message_port_proto(js));
114 js_set_slot(port, SLOT_WT_PORT_TAG, js_true);
115 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js));
116 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
117 js_set_slot(port, SLOT_WT_PORT_PEER, js_mknull());
118 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_false);
119 js_set_slot(port, SLOT_WT_PORT_STARTED, js_false);
120 js_set_slot(port, SLOT_WT_PORT_ON_MESSAGE, js_mkundef());
121 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef());
122 js_set(js, port, "onmessage", js_mkundef());
123 return port;
124}
125
126static bool wt_port_is_closed(ant_t *js, ant_value_t port) {
127 return js_truthy(js, js_get_slot(port, SLOT_WT_PORT_CLOSED));
128}
129
130static void wt_port_set_closed(ant_t *js, ant_value_t port, bool closed) {
131 js_set_slot(port, SLOT_WT_PORT_CLOSED, js_bool(closed));
132}
133
134static bool wt_port_queue_dequeue(ant_t *js, ant_value_t port, ant_value_t *out) {
135 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE);
136 if (vtype(queue) != T_ARR) return false;
137
138 ant_value_t head_val = js_get_slot(port, SLOT_WT_PORT_HEAD);
139 ant_offset_t head = (vtype(head_val) == T_NUM) ? (ant_offset_t)js_getnum(head_val) : 0;
140 ant_offset_t len = js_arr_len(js, queue);
141 if (len <= 0 || head >= len) {
142 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js));
143 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
144 return false;
145 }
146
147 if (out) *out = js_arr_get(js, queue, head);
148 ant_offset_t next_head = head + 1;
149
150 if (next_head >= len) {
151 js_set_slot(port, SLOT_WT_PORT_QUEUE, js_mkarr(js));
152 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
153 return true;
154 }
155
156 if (next_head > 32 && next_head * 2 >= len) {
157 ant_value_t compact = js_mkarr(js);
158 for (ant_offset_t i = next_head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, queue, i));
159 js_set_slot(port, SLOT_WT_PORT_QUEUE, compact);
160 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
161 return true;
162 }
163
164 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum((double)next_head));
165 return true;
166}
167
168static void wt_port_queue_push(ant_t *js, ant_value_t port, ant_value_t value) {
169 ant_value_t queue = js_get_slot(port, SLOT_WT_PORT_QUEUE);
170 if (vtype(queue) != T_ARR) {
171 queue = js_mkarr(js);
172 js_set_slot(port, SLOT_WT_PORT_QUEUE, queue);
173 js_set_slot(port, SLOT_WT_PORT_HEAD, js_mknum(0));
174 }
175 js_arr_push(js, queue, value);
176}
177
178static void wt_port_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) {
179 if (!is_callable(fn)) return;
180 ant_value_t argv[1] = {arg};
181
182 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false);
183}
184
185static bool wt_port_should_deliver(ant_t *js, ant_value_t port) {
186 if (wt_port_is_closed(js, port)) return false;
187 bool started = js_truthy(js, js_get_slot(port, SLOT_WT_PORT_STARTED));
188 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE);
189 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE);
190 ant_value_t onmessage = js_get(js, port, "onmessage");
191
192 bool has_event_listener = is_callable(on_fn) || is_callable(once_fn);
193 if (is_callable(onmessage)) return true;
194 return started && has_event_listener;
195}
196
197static void wt_port_drain(ant_t *js, ant_value_t port) {
198 if (!wt_is_message_port(js, port)) return;
199
200 while (wt_port_should_deliver(js, port)) {
201 ant_value_t msg = js_mkundef();
202 if (!wt_port_queue_dequeue(js, port, &msg)) break;
203
204 ant_value_t on_fn = js_get_slot(port, SLOT_WT_PORT_ON_MESSAGE);
205 wt_port_call_listener(js, port, on_fn, msg);
206
207 ant_value_t once_fn = js_get_slot(port, SLOT_WT_PORT_ONCE_MESSAGE);
208 if (is_callable(once_fn)) {
209 wt_port_call_listener(js, port, once_fn, msg);
210 js_set_slot(port, SLOT_WT_PORT_ONCE_MESSAGE, js_mkundef());
211 }
212
213 ant_value_t onmessage = js_get(js, port, "onmessage");
214 if (is_callable(onmessage)) {
215 ant_value_t event_obj = js_mkobj(js);
216 js_set(js, event_obj, "data", msg);
217 wt_port_call_listener(js, port, onmessage, event_obj);
218 }
219
220 if (wt_port_is_closed(js, port)) break;
221 }
222}
223
224static ant_value_t wt_make_resolved_promise(ant_t *js, ant_value_t value) {
225 ant_value_t p = js_mkpromise(js);
226 js_resolve_promise(js, p, value);
227 return p;
228}
229
230static void wt_call_listener(ant_t *js, ant_value_t this_obj, ant_value_t fn, ant_value_t arg) {
231 if (!is_callable(fn)) return;
232 ant_value_t argv[1] = {arg};
233
234 sv_vm_call(js->vm, js, fn, this_obj, argv, 1, NULL, false);
235}
236
237static void wt_emit(ant_worker_thread_t *wt, const char *event, ant_value_t arg) {
238 if (!wt || !wt->js) return;
239 ant_t *js = wt->js;
240 ant_value_t this_obj = wt->self_val;
241 if (!is_object_type(this_obj)) return;
242
243 internal_slot_t on_slot, once_slot;
244 if (strcmp(event, "message") == 0) {
245 on_slot = SLOT_WT_ON_MESSAGE;
246 once_slot = SLOT_WT_ONCE_MESSAGE;
247 } else if (strcmp(event, "exit") == 0) {
248 on_slot = SLOT_WT_ON_EXIT;
249 once_slot = SLOT_WT_ONCE_EXIT;
250 } else return;
251
252 ant_value_t on_fn = js_get_slot(this_obj, on_slot);
253 wt_call_listener(js, this_obj, on_fn, arg);
254
255 ant_value_t once_fn = js_get_slot(this_obj, once_slot);
256 if (is_callable(once_fn)) {
257 wt_call_listener(js, this_obj, once_fn, arg);
258 js_set_slot(this_obj, once_slot, js_mkundef());
259 }
260}
261
262static void wt_free_env(char **env) {
263 if (!env) return;
264 for (char **p = env; *p; p++) free(*p);
265 free(env);
266}
267
268static char **wt_build_worker_env(const char *worker_data_json, const char *env_store_json) {
269 size_t count = 0;
270 if (environ) {
271 while (environ[count]) count++;
272 }
273
274 size_t extra = 2;
275 if (worker_data_json) extra++;
276 if (env_store_json) extra++;
277 char **env = (char **)calloc(count + extra, sizeof(char *));
278 if (!env) return NULL;
279
280 size_t out = 0;
281 for (size_t i = 0; i < count; i++) {
282 env[out] = strdup(environ[i]);
283 if (!env[out]) {
284 wt_free_env(env);
285 return NULL;
286 }
287 out++;
288 }
289
290 env[out++] = strdup(WT_ENV_MODE "=1");
291 if (!env[out - 1]) {
292 wt_free_env(env);
293 return NULL;
294 }
295
296 if (worker_data_json) {
297 size_t key_len = strlen(WT_ENV_DATA_JSON);
298 size_t val_len = strlen(worker_data_json);
299 char *entry = (char *)malloc(key_len + 1 + val_len + 1);
300 if (!entry) {
301 wt_free_env(env);
302 return NULL;
303 }
304 memcpy(entry, WT_ENV_DATA_JSON, key_len);
305 entry[key_len] = '=';
306 memcpy(entry + key_len + 1, worker_data_json, val_len);
307 entry[key_len + 1 + val_len] = '\0';
308 env[out++] = entry;
309 }
310
311 if (env_store_json) {
312 size_t key_len = strlen(WT_ENV_STORE_JSON);
313 size_t val_len = strlen(env_store_json);
314 char *entry = (char *)malloc(key_len + 1 + val_len + 1);
315 if (!entry) {
316 wt_free_env(env);
317 return NULL;
318 }
319 memcpy(entry, WT_ENV_STORE_JSON, key_len);
320 entry[key_len] = '=';
321 memcpy(entry + key_len + 1, env_store_json, val_len);
322 entry[key_len + 1 + val_len] = '\0';
323 env[out++] = entry;
324 }
325
326 env[out] = NULL;
327 return env;
328}
329
330static void wt_cleanup(ant_worker_thread_t *wt) {
331 if (!wt) return;
332 free(wt->line_buf);
333 free(wt);
334}
335
336static void wt_detach(ant_worker_thread_t *wt) {
337 if (!wt) return;
338
339 if (wt->prev) wt->prev->next = wt->next;
340 else active_workers_head = wt->next;
341 if (wt->next) wt->next->prev = wt->prev;
342 wt->prev = NULL;
343 wt->next = NULL;
344
345 wt->self_val = js_mkundef();
346 wt->terminate_val = js_mkundef();
347 wt->has_terminate_val = false;
348 free(wt->line_buf);
349 wt->line_buf = NULL;
350 wt->line_len = 0;
351 wt->line_cap = 0;
352 wt->spawned = false;
353 wt->refed = false;
354 wt->closed = true;
355 wt->js = NULL;
356}
357
358static void wt_on_handle_closed(uv_handle_t *h) {
359 ant_worker_thread_t *wt = (ant_worker_thread_t *)h->data;
360 if (!wt) return;
361 if (wt->close_pending > 0) wt->close_pending--;
362 if (wt->close_pending == 0) wt_detach(wt);
363}
364
365static void wt_finish_exit(ant_worker_thread_t *wt) {
366 if (!wt) return;
367 if (wt->closed || wt->closing) return;
368 wt->closing = true;
369 wt->spawned = false;
370 wt->refed = false;
371 wt->close_pending = 0;
372
373 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) {
374 wt->close_pending++;
375 uv_close((uv_handle_t *)&wt->stdout_pipe, wt_on_handle_closed);
376 }
377 if (!uv_is_closing((uv_handle_t *)&wt->process)) {
378 wt->close_pending++;
379 uv_close((uv_handle_t *)&wt->process, wt_on_handle_closed);
380 }
381
382 if (wt->close_pending == 0) wt_detach(wt);
383}
384
385static void wt_on_process_exit(uv_process_t *proc, int64_t exit_status, int term_signal) {
386 ant_worker_thread_t *wt = (ant_worker_thread_t *)proc->data;
387 if (!wt || !wt->js) return;
388
389 wt->exited = true;
390 wt->exit_status = exit_status;
391 wt->term_signal = term_signal;
392
393 if (wt->has_terminate_val) {
394 ant_value_t p = wt->terminate_val;
395 js_resolve_promise(wt->js, p, js_mknum((double)exit_status));
396 wt->terminate_val = js_mkundef();
397 wt->has_terminate_val = false;
398 }
399
400 wt_emit(wt, "exit", js_mknum((double)exit_status));
401 wt_finish_exit(wt);
402}
403
404static void wt_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
405 buf->base = (char *)malloc(suggested_size);
406 buf->len = buf->base ? suggested_size : 0;
407}
408
409static void wt_emit_message_from_json(ant_worker_thread_t *wt, const char *json, size_t len) {
410 if (!wt || !wt->js) return;
411 ant_t *js = wt->js;
412
413 ant_value_t s = js_mkstr(js, json, len);
414 ant_value_t msg = json_parse_value(js, s);
415 if (is_err(msg)) msg = s;
416
417 wt_emit(wt, "message", msg);
418}
419
420static void wt_process_lines(ant_worker_thread_t *wt) {
421 if (!wt || !wt->line_buf) return;
422
423 for (;;) {
424 char *nl = memchr(wt->line_buf, '\n', wt->line_len);
425 if (!nl) break;
426
427 size_t line_len = (size_t)(nl - wt->line_buf);
428 if (line_len > 0 && wt->line_buf[line_len - 1] == '\r') line_len--;
429
430 size_t prefix_len = strlen(WT_MSG_PREFIX);
431 if (line_len >= prefix_len && memcmp(wt->line_buf, WT_MSG_PREFIX, prefix_len) == 0) {
432 const char *payload = wt->line_buf + prefix_len;
433 size_t payload_len = line_len - prefix_len;
434 wt_emit_message_from_json(wt, payload, payload_len);
435 }
436
437 size_t consumed = (size_t)(nl - wt->line_buf) + 1;
438 size_t remain = wt->line_len - consumed;
439 memmove(wt->line_buf, wt->line_buf + consumed, remain);
440 wt->line_len = remain;
441 }
442}
443
444static bool wt_append_stdout(ant_worker_thread_t *wt, const char *data, size_t len) {
445 if (!wt || !data || len == 0) return true;
446
447 size_t needed = wt->line_len + len;
448 if (needed + 1 > wt->line_cap) {
449 size_t cap = wt->line_cap ? wt->line_cap : 1024;
450 while (cap < needed + 1) cap *= 2;
451 char *next = (char *)realloc(wt->line_buf, cap);
452 if (!next) return false;
453 wt->line_buf = next;
454 wt->line_cap = cap;
455 }
456
457 memcpy(wt->line_buf + wt->line_len, data, len);
458 wt->line_len += len;
459 wt->line_buf[wt->line_len] = '\0';
460 return true;
461}
462
463static void wt_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
464 ant_worker_thread_t *wt = (ant_worker_thread_t *)stream->data;
465 if (!wt) {
466 free(buf->base);
467 return;
468 }
469
470 if (nread > 0) {
471 if (wt_append_stdout(wt, buf->base, (size_t)nread)) wt_process_lines(wt);
472 } else if (nread < 0) {
473 uv_read_stop(stream);
474 }
475
476 free(buf->base);
477}
478
479static char *wt_path_from_specifier(ant_t *js, ant_value_t spec) {
480 const char *raw = NULL;
481 size_t len = 0;
482
483 if (vtype(spec) == T_STR) {
484 raw = js_getstr(js, spec, &len);
485 } else if (is_object_type(spec)) {
486 ant_value_t pathname = js_get(js, spec, "pathname");
487 if (vtype(pathname) == T_STR) raw = js_getstr(js, pathname, &len);
488 if (!raw) {
489 ant_value_t href = js_get(js, spec, "href");
490 if (vtype(href) == T_STR) raw = js_getstr(js, href, &len);
491 }
492 }
493
494 if (!raw || len == 0) return NULL;
495
496 if (len >= 7 && strncmp(raw, "file://", 7) == 0) {
497 const char *p = raw + 7;
498 if (strncmp(p, "localhost/", 10) == 0) p += 9;
499 if (*p == '\0') return NULL;
500 return strndup(p, len - (size_t)(p - raw));
501 }
502
503 return strndup(raw, len);
504}
505
506static int wt_spawn_worker(
507 ant_worker_thread_t *wt,
508 const char *script_path,
509 const char *worker_data_json,
510 const char *env_store_json
511) {
512 if (!wt || !wt->js || !script_path || !rt || !rt->argv || rt->argc <= 0) return UV_EINVAL;
513
514 uv_loop_t *loop = uv_default_loop();
515 uv_pipe_init(loop, &wt->stdout_pipe, 0);
516
517 uv_stdio_container_t stdio[3];
518 stdio[0].flags = UV_IGNORE;
519 stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
520 stdio[1].data.stream = (uv_stream_t *)&wt->stdout_pipe;
521 stdio[2].flags = UV_INHERIT_FD;
522 stdio[2].data.fd = 2;
523
524 char *argv0 = strdup(rt->argv[0]);
525 char *argv1 = strdup(script_path);
526 if (!argv0 || !argv1) {
527 free(argv0);
528 free(argv1);
529 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL);
530 return UV_ENOMEM;
531 }
532
533 char *args[3] = {argv0, argv1, NULL};
534 char **env = wt_build_worker_env(worker_data_json, env_store_json);
535 if (!env) {
536 free(argv0);
537 free(argv1);
538 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL);
539 return UV_ENOMEM;
540 }
541
542 uv_process_options_t options;
543 memset(&options, 0, sizeof(options));
544 options.file = argv0;
545 options.args = args;
546 options.env = env;
547 options.stdio_count = 3;
548 options.stdio = stdio;
549 options.exit_cb = wt_on_process_exit;
550
551 wt->process.data = wt;
552 wt->stdout_pipe.data = wt;
553
554 int rc = uv_spawn(loop, &wt->process, &options);
555
556 wt_free_env(env);
557 free(argv0);
558 free(argv1);
559
560 if (rc != 0) {
561 uv_close((uv_handle_t *)&wt->stdout_pipe, NULL);
562 return rc;
563 }
564
565 wt->spawned = true;
566 wt->refed = true;
567
568 rc = uv_read_start((uv_stream_t *)&wt->stdout_pipe, wt_alloc_cb, wt_read_cb);
569 if (rc != 0) {
570 uv_process_kill(&wt->process, SIGTERM);
571 }
572 return rc;
573}
574
575static ant_value_t worker_threads_worker_on(ant_t *js, ant_value_t *args, int nargs) {
576 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
577 return js_mkerr(js, "Worker.on(event, listener) requires (string, function)");
578 }
579
580 ant_value_t this_obj = js_getthis(js);
581 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver");
582
583 size_t len = 0;
584 const char *event = js_getstr(js, args[0], &len);
585 if (!event) return js_mkerr(js, "invalid event name");
586
587 if (len == 7 && memcmp(event, "message", 7) == 0) {
588 js_set_slot(this_obj, SLOT_WT_ON_MESSAGE, args[1]);
589 } else if (len == 4 && memcmp(event, "exit", 4) == 0) {
590 js_set_slot(this_obj, SLOT_WT_ON_EXIT, args[1]);
591 }
592
593 return this_obj;
594}
595
596static ant_value_t worker_threads_worker_once(ant_t *js, ant_value_t *args, int nargs) {
597 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
598 return js_mkerr(js, "Worker.once(event, listener) requires (string, function)");
599 }
600
601 ant_value_t this_obj = js_getthis(js);
602 if (!is_object_type(this_obj)) return js_mkerr(js, "invalid Worker receiver");
603
604 size_t len = 0;
605 const char *event = js_getstr(js, args[0], &len);
606 if (!event) return js_mkerr(js, "invalid event name");
607
608 if (len == 7 && memcmp(event, "message", 7) == 0) {
609 js_set_slot(this_obj, SLOT_WT_ONCE_MESSAGE, args[1]);
610 } else if (len == 4 && memcmp(event, "exit", 4) == 0) {
611 js_set_slot(this_obj, SLOT_WT_ONCE_EXIT, args[1]);
612 }
613
614 return this_obj;
615}
616
617static ant_value_t worker_threads_worker_unref(ant_t *js, ant_value_t *args, int nargs) {
618 ant_value_t this_obj = js_getthis(js);
619 ant_worker_thread_t *wt = wt_get_worker(js, this_obj);
620 if (!wt) return js_mkerr(js, "invalid Worker receiver");
621 if (wt->spawned && wt->refed) {
622 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_unref((uv_handle_t *)&wt->process);
623 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_unref((uv_handle_t *)&wt->stdout_pipe);
624 wt->refed = false;
625 }
626 return this_obj;
627}
628
629static ant_value_t worker_threads_worker_ref(ant_t *js, ant_value_t *args, int nargs) {
630 ant_value_t this_obj = js_getthis(js);
631 ant_worker_thread_t *wt = wt_get_worker(js, this_obj);
632 if (!wt) return js_mkerr(js, "invalid Worker receiver");
633 if (wt->spawned && !wt->refed) {
634 if (!uv_is_closing((uv_handle_t *)&wt->process)) uv_ref((uv_handle_t *)&wt->process);
635 if (!uv_is_closing((uv_handle_t *)&wt->stdout_pipe)) uv_ref((uv_handle_t *)&wt->stdout_pipe);
636 wt->refed = true;
637 }
638 return this_obj;
639}
640
641static ant_value_t worker_threads_worker_terminate(ant_t *js, ant_value_t *args, int nargs) {
642 ant_value_t this_obj = js_getthis(js);
643 ant_worker_thread_t *wt = wt_get_worker(js, this_obj);
644 if (!wt) return js_mkerr(js, "invalid Worker receiver");
645
646 if (wt->exited) return wt_make_resolved_promise(js, js_mknum((double)wt->exit_status));
647
648 if (!wt->has_terminate_val) {
649 ant_value_t p = js_mkpromise(js);
650 wt->terminate_val = p;
651 wt->has_terminate_val = true;
652 }
653
654 int rc = uv_process_kill(&wt->process, SIGTERM);
655 if (rc != 0) {
656 ant_value_t p = wt->terminate_val;
657 js_reject_promise(js, p, js_mkerr(js, "terminate failed: %s", uv_strerror(rc)));
658 wt->terminate_val = js_mkundef();
659 wt->has_terminate_val = false;
660 return p;
661 }
662
663 return wt->terminate_val;
664}
665
666static ant_value_t worker_threads_worker_post_message(ant_t *js, ant_value_t *args, int nargs) {
667 return js_mkerr(js, "Worker.postMessage is not implemented yet");
668}
669
670static ant_value_t worker_threads_message_port_post_message(ant_t *js, ant_value_t *args, int nargs) {
671 ant_value_t this_obj = js_getthis(js);
672 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
673 if (wt_port_is_closed(js, this_obj)) return js_mkundef();
674
675 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER);
676 if (!wt_is_message_port(js, peer) || wt_port_is_closed(js, peer)) return js_mkundef();
677
678 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef();
679 wt_port_queue_push(js, peer, value);
680 wt_port_drain(js, peer);
681 return js_mkundef();
682}
683
684static ant_value_t worker_threads_message_port_on(ant_t *js, ant_value_t *args, int nargs) {
685 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
686 return js_mkerr(js, "MessagePort.on(event, listener) requires (string, function)");
687 }
688 ant_value_t this_obj = js_getthis(js);
689 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
690
691 size_t len = 0;
692 const char *event = js_getstr(js, args[0], &len);
693 if (!event) return js_mkerr(js, "invalid event name");
694 if (len == 7 && memcmp(event, "message", 7) == 0) {
695 js_set_slot(this_obj, SLOT_WT_PORT_ON_MESSAGE, args[1]);
696 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true);
697 wt_port_drain(js, this_obj);
698 }
699 return this_obj;
700}
701
702static ant_value_t worker_threads_message_port_once(ant_t *js, ant_value_t *args, int nargs) {
703 if (nargs < 2 || vtype(args[0]) != T_STR || !is_callable(args[1])) {
704 return js_mkerr(js, "MessagePort.once(event, listener) requires (string, function)");
705 }
706 ant_value_t this_obj = js_getthis(js);
707 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
708
709 size_t len = 0;
710 const char *event = js_getstr(js, args[0], &len);
711 if (!event) return js_mkerr(js, "invalid event name");
712 if (len == 7 && memcmp(event, "message", 7) == 0) {
713 js_set_slot(this_obj, SLOT_WT_PORT_ONCE_MESSAGE, args[1]);
714 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true);
715 wt_port_drain(js, this_obj);
716 }
717 return this_obj;
718}
719
720static ant_value_t worker_threads_message_port_start(ant_t *js, ant_value_t *args, int nargs) {
721 ant_value_t this_obj = js_getthis(js);
722 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
723 js_set_slot(this_obj, SLOT_WT_PORT_STARTED, js_true);
724 wt_port_drain(js, this_obj);
725 return js_mkundef();
726}
727
728static ant_value_t worker_threads_message_port_close(ant_t *js, ant_value_t *args, int nargs) {
729 ant_value_t this_obj = js_getthis(js);
730 if (!wt_is_message_port(js, this_obj)) return js_mkerr(js, "invalid MessagePort receiver");
731 wt_port_set_closed(js, this_obj, true);
732
733 ant_value_t peer = js_get_slot(this_obj, SLOT_WT_PORT_PEER);
734 js_set_slot(this_obj, SLOT_WT_PORT_PEER, js_mknull());
735 if (wt_is_message_port(js, peer)) js_set_slot(peer, SLOT_WT_PORT_PEER, js_mknull());
736 return js_mkundef();
737}
738
739static ant_value_t worker_threads_message_port_ref(ant_t *js, ant_value_t *args, int nargs) {
740 return js_mkundef();
741}
742
743static ant_value_t worker_threads_message_port_unref(ant_t *js, ant_value_t *args, int nargs) {
744 return js_mkundef();
745}
746
747static ant_value_t worker_threads_message_port_ctor(ant_t *js, ant_value_t *args, int nargs) {
748 return js_mkerr(js, "MessagePort constructor is not public");
749}
750
751static ant_value_t worker_threads_message_channel_ctor(ant_t *js, ant_value_t *args, int nargs) {
752 if (vtype(js->new_target) == T_UNDEF) {
753 return js_mkerr(js, "MessageChannel constructor requires 'new'");
754 }
755
756 ant_value_t this_obj = js_getthis(js);
757 ant_value_t port1 = wt_make_message_port(js);
758 ant_value_t port2 = wt_make_message_port(js);
759 js_set_slot(port1, SLOT_WT_PORT_PEER, port2);
760 js_set_slot(port2, SLOT_WT_PORT_PEER, port1);
761 js_set(js, this_obj, "port1", port1);
762 js_set(js, this_obj, "port2", port2);
763 return this_obj;
764}
765
766static ant_value_t worker_threads_worker_ctor(ant_t *js, ant_value_t *args, int nargs) {
767 ant_value_t this_obj = js_getthis(js);
768 ant_value_t proto = js_instance_proto_from_new_target(js, js_mkundef());
769
770 if (vtype(js->new_target) == T_UNDEF) {
771 return js_mkerr(js, "Worker constructor requires 'new'");
772 }
773 if (nargs < 1) return js_mkerr(js, "Worker() requires a filename or URL");
774 if (is_object_type(this_obj) && is_object_type(proto)) js_set_proto_init(this_obj, proto);
775
776 char *script_path = wt_path_from_specifier(js, args[0]);
777 if (!script_path) return js_mkerr(js, "Invalid Worker filename/URL");
778
779 const char *worker_data_json = NULL;
780 const char *env_store_json = NULL;
781 char *worker_data_heap = NULL;
782 char *env_store_heap = NULL;
783 if (nargs >= 2 && is_object_type(args[1])) {
784 ant_value_t worker_data = js_get(js, args[1], "workerData");
785 if (!is_undefined(worker_data)) {
786 ant_value_t stringify_args[1] = {worker_data};
787 ant_value_t json = js_json_stringify(js, stringify_args, 1);
788 if (vtype(json) != T_STR) {
789 free(script_path);
790 return js_mkerr(js, "Worker options.workerData must be JSON-serializable");
791 }
792 size_t len = 0;
793 const char *raw = js_getstr(js, json, &len);
794 if (!raw) {
795 free(script_path);
796 return js_mkerr(js, "Failed to stringify workerData");
797 }
798 worker_data_heap = strndup(raw, len);
799 if (!worker_data_heap) {
800 free(script_path);
801 return js_mkerr(js, "Out of memory");
802 }
803 worker_data_json = worker_data_heap;
804 }
805 }
806
807 ant_value_t env_store = wt_get_or_create_env_store(js);
808 ant_value_t env_stringify_args[1] = {env_store};
809 ant_value_t env_json = js_json_stringify(js, env_stringify_args, 1);
810 if (vtype(env_json) != T_STR) {
811 free(script_path);
812 free(worker_data_heap);
813 return js_mkerr(js, "setEnvironmentData values must be JSON-serializable");
814 }
815 size_t env_len = 0;
816 const char *env_raw = js_getstr(js, env_json, &env_len);
817 if (!env_raw) {
818 free(script_path);
819 free(worker_data_heap);
820 return js_mkerr(js, "Failed to snapshot environment data");
821 }
822 env_store_heap = strndup(env_raw, env_len);
823 if (!env_store_heap) {
824 free(script_path);
825 free(worker_data_heap);
826 return js_mkerr(js, "Out of memory");
827 }
828 env_store_json = env_store_heap;
829
830 ant_worker_thread_t *wt = (ant_worker_thread_t *)calloc(1, sizeof(*wt));
831 if (!wt) {
832 free(script_path);
833 free(worker_data_heap);
834 free(env_store_heap);
835 return js_mkerr(js, "Out of memory");
836 }
837
838 wt->js = js;
839 wt->self_val = this_obj;
840 js_set_native(this_obj, wt, WORKER_NATIVE_TAG);
841
842 wt->prev = NULL;
843 wt->next = active_workers_head;
844 if (active_workers_head) active_workers_head->prev = wt;
845 active_workers_head = wt;
846
847 int rc = wt_spawn_worker(wt, script_path, worker_data_json, env_store_json);
848 free(script_path);
849 free(worker_data_heap);
850 free(env_store_heap);
851
852 if (rc != 0) {
853 js_set_slot(this_obj, SLOT_DATA, js_mkundef());
854 wt_cleanup(wt);
855 return js_mkerr(js, "Failed to spawn Worker: %s", uv_strerror(rc));
856 }
857
858 return this_obj;
859}
860
861static ant_value_t worker_threads_parent_post_message(ant_t *js, ant_value_t *args, int nargs) {
862 ant_value_t value = (nargs > 0) ? args[0] : js_mkundef();
863 ant_value_t stringify_args[1] = {value};
864 ant_value_t json = js_json_stringify(js, stringify_args, 1);
865 if (vtype(json) != T_STR) return js_mkerr(js, "parentPort.postMessage payload must be JSON-serializable");
866
867 size_t json_len = 0;
868 const char *json_str = js_getstr(js, json, &json_len);
869 if (!json_str) return js_mkerr(js, "Failed to serialize message");
870
871 size_t prefix_len = strlen(WT_MSG_PREFIX);
872 size_t line_len = prefix_len + json_len + 1;
873 char *line = (char *)malloc(line_len + 1);
874 if (!line) return js_mkerr(js, "Out of memory");
875
876 memcpy(line, WT_MSG_PREFIX, prefix_len);
877 memcpy(line + prefix_len, json_str, json_len);
878 line[prefix_len + json_len] = '\n';
879 line[prefix_len + json_len + 1] = '\0';
880
881 ssize_t wrote = WT_WRITE(WT_STDOUT_FD, line, (unsigned int)line_len);
882 free(line);
883 if (wrote < 0) return js_mkerr(js, "parentPort.postMessage failed");
884 return js_mkundef();
885}
886
887static ant_value_t worker_threads_parent_unref(ant_t *js, ant_value_t *args, int nargs) {
888 return js_mkundef();
889}
890
891static ant_value_t worker_threads_mark_as_untransferable(ant_t *js, ant_value_t *args, int nargs) {
892 if (nargs < 1) return js_mkundef();
893 return args[0];
894}
895
896static ant_value_t worker_threads_receive_message_on_port(ant_t *js, ant_value_t *args, int nargs) {
897 if (nargs < 1 || !wt_is_message_port(js, args[0])) {
898 return js_mkerr(js, "receiveMessageOnPort(port) requires a MessagePort");
899 }
900 ant_value_t msg = js_mkundef();
901 if (!wt_port_queue_dequeue(js, args[0], &msg)) return js_mkundef();
902
903 ant_value_t out = js_mkobj(js);
904 js_set(js, out, "message", msg);
905 return out;
906}
907
908static ant_value_t worker_threads_set_environment_data(ant_t *js, ant_value_t *args, int nargs) {
909 if (nargs < 2) return js_mkerr(js, "setEnvironmentData(key, value) requires 2 arguments");
910
911 ant_value_t key_stringify_args[1] = {args[0]};
912 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1);
913 if (vtype(key_json) != T_STR) return js_mkerr(js, "setEnvironmentData key must be JSON-serializable");
914
915 ant_value_t value_stringify_args[1] = {args[1]};
916 ant_value_t value_json = js_json_stringify(js, value_stringify_args, 1);
917 if (vtype(value_json) != T_STR) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable");
918
919 ant_value_t cloned = json_parse_value(js, value_json);
920 if (is_err(cloned)) return js_mkerr(js, "setEnvironmentData value must be JSON-serializable");
921
922 size_t key_len = 0;
923 const char *key_ptr = js_getstr(js, key_json, &key_len);
924 if (!key_ptr) return js_mkerr(js, "Failed to serialize environment data key");
925
926 char *key = strndup(key_ptr, key_len);
927 if (!key) return js_mkerr(js, "Out of memory");
928 js_set(js, wt_get_or_create_env_store(js), key, cloned);
929 free(key);
930 return js_mkundef();
931}
932
933static ant_value_t worker_threads_get_environment_data(ant_t *js, ant_value_t *args, int nargs) {
934 if (nargs < 1) return js_mkundef();
935
936 ant_value_t key_stringify_args[1] = {args[0]};
937 ant_value_t key_json = js_json_stringify(js, key_stringify_args, 1);
938 if (vtype(key_json) != T_STR) return js_mkundef();
939
940 size_t key_len = 0;
941 const char *key_ptr = js_getstr(js, key_json, &key_len);
942 if (!key_ptr) return js_mkundef();
943
944 char *key = strndup(key_ptr, key_len);
945 if (!key) return js_mkerr(js, "Out of memory");
946
947 ant_value_t store = wt_get_or_create_env_store(js);
948 ant_offset_t off = lkp(js, store, key, key_len);
949 if (off == 0) {
950 free(key);
951 return js_mkundef();
952 }
953
954 ant_value_t value = js_get(js, store, key);
955 free(key);
956 return value;
957}
958
959static ant_value_t worker_threads_move_message_port_to_context(ant_t *js, ant_value_t *args, int nargs) {
960 if (nargs < 1 || !wt_is_message_port(js, args[0])) {
961 return js_mkerr(js, "moveMessagePortToContext(port, context) requires a MessagePort");
962 }
963 return args[0];
964}
965
966void gc_mark_worker_threads(ant_t *js, gc_mark_fn mark) {
967 for (ant_worker_thread_t *wt = active_workers_head; wt; wt = wt->next) {
968 mark(js, wt->self_val);
969 if (wt->has_terminate_val) mark(js, wt->terminate_val);
970 }
971}
972
973ant_value_t worker_threads_library(ant_t *js) {
974 ant_value_t lib = js_mkobj(js);
975 bool is_worker = wt_is_worker_mode();
976
977 wt_init_env_store(js, is_worker);
978
979 js_set(js, lib, "isMainThread", js_bool(!is_worker));
980 js_set(js, lib, "threadId", js_mknum((double)(is_worker ? rt->pid : 0)));
981 js_set(js, lib, "SHARE_ENV", js_mksym(js, "SHARE_ENV"));
982
983 ant_value_t message_port_ctor_obj = js_mkobj(js);
984 ant_value_t message_port_proto = js_mkobj(js);
985 js_set(js, message_port_proto, "postMessage", js_mkfun(worker_threads_message_port_post_message));
986 js_set(js, message_port_proto, "on", js_mkfun(worker_threads_message_port_on));
987 js_set(js, message_port_proto, "once", js_mkfun(worker_threads_message_port_once));
988 js_set(js, message_port_proto, "start", js_mkfun(worker_threads_message_port_start));
989 js_set(js, message_port_proto, "close", js_mkfun(worker_threads_message_port_close));
990 js_set(js, message_port_proto, "ref", js_mkfun(worker_threads_message_port_ref));
991 js_set(js, message_port_proto, "unref", js_mkfun(worker_threads_message_port_unref));
992 js_set_sym(js, message_port_proto, get_toStringTag_sym(), js_mkstr(js, "MessagePort", 11));
993
994 js_set_slot(message_port_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_port_ctor));
995 js_mkprop_fast(js, message_port_ctor_obj, "prototype", 9, message_port_proto);
996 js_mkprop_fast(js, message_port_ctor_obj, "name", 4, js_mkstr(js, "MessagePort", 11));
997 js_set_descriptor(js, message_port_ctor_obj, "name", 4, 0);
998 js_set(js, lib, "MessagePort", js_obj_to_func(message_port_ctor_obj));
999 js_set_slot(js->global, SLOT_WT_PORT_PROTO, message_port_proto);
1000
1001 ant_value_t message_channel_ctor_obj = js_mkobj(js);
1002 ant_value_t message_channel_proto = js_mkobj(js);
1003 js_set_sym(js, message_channel_proto, get_toStringTag_sym(), js_mkstr(js, "MessageChannel", 14));
1004 js_set_slot(message_channel_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_message_channel_ctor));
1005 js_mkprop_fast(js, message_channel_ctor_obj, "prototype", 9, message_channel_proto);
1006 js_mkprop_fast(js, message_channel_ctor_obj, "name", 4, js_mkstr(js, "MessageChannel", 14));
1007 js_set_descriptor(js, message_channel_ctor_obj, "name", 4, 0);
1008 js_set(js, lib, "MessageChannel", js_obj_to_func(message_channel_ctor_obj));
1009
1010 if (is_worker) {
1011 ant_value_t parent_port = js_mkobj(js);
1012 js_set(js, parent_port, "postMessage", js_mkfun(worker_threads_parent_post_message));
1013 js_set(js, parent_port, "unref", js_mkfun(worker_threads_parent_unref));
1014 js_set(js, parent_port, "ref", js_mkfun(worker_threads_parent_unref));
1015 js_set(js, lib, "parentPort", parent_port);
1016
1017 const char *worker_data_json = getenv(WT_ENV_DATA_JSON);
1018 if (worker_data_json && worker_data_json[0]) {
1019 ant_value_t raw = js_mkstr(js, worker_data_json, strlen(worker_data_json));
1020 ant_value_t parsed = json_parse_value(js, raw);
1021 js_set(js, lib, "workerData", is_err(parsed) ? js_mkundef() : parsed);
1022 } else js_set(js, lib, "workerData", js_mkundef());
1023 } else {
1024 js_set(js, lib, "parentPort", js_mknull());
1025 js_set(js, lib, "workerData", js_mkundef());
1026 }
1027
1028 ant_value_t worker_ctor_obj = js_mkobj(js);
1029 ant_value_t worker_proto = js_mkobj(js);
1030
1031 js_set(js, worker_proto, "on", js_mkfun(worker_threads_worker_on));
1032 js_set(js, worker_proto, "once", js_mkfun(worker_threads_worker_once));
1033 js_set(js, worker_proto, "terminate", js_mkfun(worker_threads_worker_terminate));
1034 js_set(js, worker_proto, "unref", js_mkfun(worker_threads_worker_unref));
1035 js_set(js, worker_proto, "ref", js_mkfun(worker_threads_worker_ref));
1036 js_set(js, worker_proto, "postMessage", js_mkfun(worker_threads_worker_post_message));
1037 js_set_sym(js, worker_proto, get_toStringTag_sym(), js_mkstr(js, "Worker", 6));
1038
1039 js_set_slot(worker_ctor_obj, SLOT_CFUNC, js_mkfun(worker_threads_worker_ctor));
1040 js_mkprop_fast(js, worker_ctor_obj, "prototype", 9, worker_proto);
1041 js_mkprop_fast(js, worker_ctor_obj, "name", 4, js_mkstr(js, "Worker", 6));
1042 js_set_descriptor(js, worker_ctor_obj, "name", 4, 0);
1043 js_set(js, lib, "Worker", js_obj_to_func(worker_ctor_obj));
1044
1045 js_set(js, lib, "markAsUntransferable", js_mkfun(worker_threads_mark_as_untransferable));
1046 js_set(js, lib, "receiveMessageOnPort", js_mkfun(worker_threads_receive_message_on_port));
1047 js_set(js, lib, "setEnvironmentData", js_mkfun(worker_threads_set_environment_data));
1048 js_set(js, lib, "getEnvironmentData", js_mkfun(worker_threads_get_environment_data));
1049 js_set(js, lib, "moveMessagePortToContext", js_mkfun(worker_threads_move_message_port_to_context));
1050
1051 js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "worker_threads", 14));
1052 return lib;
1053}