Offload functions to worker threads with shared memory primitives for Node.js.
1import { registry, isModuleFrozen } from './registry.ts';
2import { PromiseLikeTask } from './task.ts';
3import { AsyncIterableTask } from './stream-task.ts';
4import type { Task } from './runner.ts';
5
6const counters = new Map<string, number>();
7
8/**
9 * Wraps a function to run on a worker thread. Must be called at module scope.
10 * @param importMeta - The `import.meta` of the calling module, used to identify the source file.
11 * @param fn - The function to offload to a worker thread.
12 * @returns A callable that creates a {@link Task} when invoked.
13 */
14
15/** A value or a Task that resolves to that value on the worker. */
16export type Arg<T> = T | Task<T>;
17
18type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> };
19
20type Moroutine<A extends unknown[], R> = {
21 (...args: A): Task<Awaited<R>, A> & PromiseLike<Awaited<R>>;
22 (...args: TaskableArgs<A>): Task<Awaited<R>, A> & PromiseLike<Awaited<R>>;
23 /** Stable id tying tasks back to this moroutine. Used by {@link isTask}. */
24 readonly id: string;
25};
26
27type AsyncIterableMoroutine<A extends unknown[], Y> = {
28 (...args: A): Task<AsyncIterable<Y>, A> & AsyncIterable<Y>;
29 (...args: TaskableArgs<A>): Task<AsyncIterable<Y>, A> & AsyncIterable<Y>;
30 /** Stable id tying tasks back to this moroutine. Used by {@link isTask}. */
31 readonly id: string;
32};
33
34type IsNever<T> = [T] extends [never] ? true : false;
35type MoReturn<A extends unknown[], R> =
36 IsNever<R> extends true
37 ? Moroutine<A, R>
38 : [R] extends [AsyncGenerator<infer Y, any, any>]
39 ? AsyncIterableMoroutine<A, Y>
40 : Moroutine<A, R>;
41
42function isAsyncGeneratorFunction(fn: Function): boolean {
43 return fn.constructor.name === 'AsyncGeneratorFunction';
44}
45
46export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> {
47 const url = importMeta.url;
48
49 if (isModuleFrozen(url)) {
50 throw new Error(
51 `Cannot call mo() for ${url} after a task from this module has been dispatched. ` +
52 'All mo() calls must happen at module load time.',
53 );
54 }
55
56 const index = counters.get(url) ?? 0;
57 counters.set(url, index + 1);
58 const id = `${url}#${index}`;
59
60 registry.set(id, fn);
61
62 const factory = isAsyncGeneratorFunction(fn)
63 ? (...args: unknown[]) => new AsyncIterableTask(id, args)
64 : (...args: unknown[]) => new PromiseLikeTask<R>(id, args);
65 Object.defineProperty(factory, 'id', { value: id, enumerable: true });
66 return factory as unknown as MoReturn<A, R>;
67}