Offload functions to worker threads with shared memory primitives for Node.js.
1import { registry, isModuleFrozen } from './registry.ts';
2import { PromiseLikeTask } from './task.ts';
3import { AsyncIterableTask } from './stream-task.ts';
4import type { Task } from './runner.ts';
5
6const counters = new Map<string, number>();
7
8/**
9 * Wraps a function to run on a worker thread. Must be called at module scope.
10 * @param importMeta - The `import.meta` of the calling module, used to identify the source file.
11 * @param fn - The function to offload to a worker thread.
12 * @returns A callable that creates a {@link Task} when invoked.
13 */
14
15/** A value or a Task that resolves to that value on the worker. */
16export type Arg<T> = T | Task<T>;
17
18type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> };
19
20type Moroutine<A extends unknown[], R> = {
21 (...args: A): Task<Awaited<R>>;
22 (...args: TaskableArgs<A>): Task<Awaited<R>>;
23};
24
25type AsyncIterableMoroutine<A extends unknown[], Y> = {
26 (...args: A): Task<AsyncIterable<Y>>;
27 (...args: TaskableArgs<A>): Task<AsyncIterable<Y>>;
28};
29
30type IsNever<T> = [T] extends [never] ? true : false;
31type MoReturn<A extends unknown[], R> =
32 IsNever<R> extends true
33 ? Moroutine<A, R>
34 : [R] extends [AsyncGenerator<infer Y, any, any>]
35 ? AsyncIterableMoroutine<A, Y>
36 : Moroutine<A, R>;
37
38function isAsyncGeneratorFunction(fn: Function): boolean {
39 return fn.constructor.name === 'AsyncGeneratorFunction';
40}
41
42export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> {
43 const url = importMeta.url;
44
45 if (isModuleFrozen(url)) {
46 throw new Error(
47 `Cannot call mo() for ${url} after a task from this module has been dispatched. ` +
48 'All mo() calls must happen at module load time.',
49 );
50 }
51
52 const index = counters.get(url) ?? 0;
53 counters.set(url, index + 1);
54 const id = `${url}#${index}`;
55
56 registry.set(id, fn);
57
58 if (isAsyncGeneratorFunction(fn)) {
59 return ((...args: unknown[]) => new AsyncIterableTask(id, args)) as any;
60 }
61 return ((...args: unknown[]) => new PromiseLikeTask<R>(id, args)) as any;
62}