Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e46aca05dbf9a2204886d23f7ae3b34883f4e5c6 62 lines 2.1 kB view raw
1import { registry, isModuleFrozen } from './registry.ts'; 2import { PromiseLikeTask } from './task.ts'; 3import { AsyncIterableTask } from './stream-task.ts'; 4import type { Task } from './runner.ts'; 5 6const counters = new Map<string, number>(); 7 8/** 9 * Wraps a function to run on a worker thread. Must be called at module scope. 10 * @param importMeta - The `import.meta` of the calling module, used to identify the source file. 11 * @param fn - The function to offload to a worker thread. 12 * @returns A callable that creates a {@link Task} when invoked. 13 */ 14 15/** A value or a Task that resolves to that value on the worker. */ 16export type Arg<T> = T | Task<T>; 17 18type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> }; 19 20type Moroutine<A extends unknown[], R> = { 21 (...args: A): Task<Awaited<R>>; 22 (...args: TaskableArgs<A>): Task<Awaited<R>>; 23}; 24 25type AsyncIterableMoroutine<A extends unknown[], Y> = { 26 (...args: A): Task<AsyncIterable<Y>>; 27 (...args: TaskableArgs<A>): Task<AsyncIterable<Y>>; 28}; 29 30type IsNever<T> = [T] extends [never] ? true : false; 31type MoReturn<A extends unknown[], R> = 32 IsNever<R> extends true 33 ? Moroutine<A, R> 34 : [R] extends [AsyncGenerator<infer Y, any, any>] 35 ? AsyncIterableMoroutine<A, Y> 36 : Moroutine<A, R>; 37 38function isAsyncGeneratorFunction(fn: Function): boolean { 39 return fn.constructor.name === 'AsyncGeneratorFunction'; 40} 41 42export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> { 43 const url = importMeta.url; 44 45 if (isModuleFrozen(url)) { 46 throw new Error( 47 `Cannot call mo() for ${url} after a task from this module has been dispatched. ` + 48 'All mo() calls must happen at module load time.', 49 ); 50 } 51 52 const index = counters.get(url) ?? 0; 53 counters.set(url, index + 1); 54 const id = `${url}#${index}`; 55 56 registry.set(id, fn); 57 58 if (isAsyncGeneratorFunction(fn)) { 59 return ((...args: unknown[]) => new AsyncIterableTask(id, args)) as any; 60 } 61 return ((...args: unknown[]) => new PromiseLikeTask<R>(id, args)) as any; 62}