Offload functions to worker threads with shared memory primitives for Node.js.
1import { registry, isModuleFrozen } from './registry.ts';
2import { Task } from './task.ts';
3import { StreamTask } from './stream-task.ts';
4
5const counters = new Map<string, number>();
6
7/**
8 * Wraps a function to run on a worker thread. Must be called at module scope.
9 * @param importMeta - The `import.meta` of the calling module, used to identify the source file.
10 * @param fn - The function to offload to a worker thread.
11 * @returns A callable that creates a {@link Task} (or {@link StreamTask} for `async function*`) when invoked.
12 */
13
14/** A value or a Task that resolves to that value on the worker. */
15export type Arg<T> = T | Task<T>;
16
17type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> };
18
19type Moroutine<A extends unknown[], R> = {
20 (...args: A): Task<Awaited<R>>;
21 (...args: TaskableArgs<A>): Task<Awaited<R>>;
22};
23
24type StreamMoroutine<A extends unknown[], Y> = {
25 (...args: A): StreamTask<Y>;
26 (...args: TaskableArgs<A>): StreamTask<Y>;
27};
28
29type IsNever<T> = [T] extends [never] ? true : false;
30type MoReturn<A extends unknown[], R> =
31 IsNever<R> extends true
32 ? Moroutine<A, R>
33 : [R] extends [AsyncGenerator<infer Y, any, any>]
34 ? StreamMoroutine<A, Y>
35 : Moroutine<A, R>;
36
37function isAsyncGeneratorFunction(fn: Function): boolean {
38 return fn.constructor.name === 'AsyncGeneratorFunction';
39}
40
41export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> {
42 const url = importMeta.url;
43
44 if (isModuleFrozen(url)) {
45 throw new Error(
46 `Cannot call mo() for ${url} after a task from this module has been dispatched. ` +
47 'All mo() calls must happen at module load time.',
48 );
49 }
50
51 const index = counters.get(url) ?? 0;
52 counters.set(url, index + 1);
53 const id = `${url}#${index}`;
54
55 registry.set(id, fn);
56
57 if (isAsyncGeneratorFunction(fn)) {
58 return ((...args: unknown[]) => new StreamTask(id, args)) as any;
59 }
60 return ((...args: unknown[]) => new Task<R>(id, args)) as any;
61}