Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e2cebd539403475e2f4e79d55ca1a84a0ce3510d 67 lines 2.5 kB view raw
1import { registry, isModuleFrozen } from './registry.ts'; 2import { PromiseLikeTask } from './task.ts'; 3import { AsyncIterableTask } from './stream-task.ts'; 4import type { Task } from './runner.ts'; 5 6const counters = new Map<string, number>(); 7 8/** 9 * Wraps a function to run on a worker thread. Must be called at module scope. 10 * @param importMeta - The `import.meta` of the calling module, used to identify the source file. 11 * @param fn - The function to offload to a worker thread. 12 * @returns A callable that creates a {@link Task} when invoked. 13 */ 14 15/** A value or a Task that resolves to that value on the worker. */ 16export type Arg<T> = T | Task<T>; 17 18type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> }; 19 20type Moroutine<A extends unknown[], R> = { 21 (...args: A): Task<Awaited<R>, A> & PromiseLike<Awaited<R>>; 22 (...args: TaskableArgs<A>): Task<Awaited<R>, A> & PromiseLike<Awaited<R>>; 23 /** Stable id tying tasks back to this moroutine. Used by {@link isTask}. */ 24 readonly id: string; 25}; 26 27type AsyncIterableMoroutine<A extends unknown[], Y> = { 28 (...args: A): Task<AsyncIterable<Y>, A> & AsyncIterable<Y>; 29 (...args: TaskableArgs<A>): Task<AsyncIterable<Y>, A> & AsyncIterable<Y>; 30 /** Stable id tying tasks back to this moroutine. Used by {@link isTask}. */ 31 readonly id: string; 32}; 33 34type IsNever<T> = [T] extends [never] ? true : false; 35type MoReturn<A extends unknown[], R> = 36 IsNever<R> extends true 37 ? Moroutine<A, R> 38 : [R] extends [AsyncGenerator<infer Y, any, any>] 39 ? AsyncIterableMoroutine<A, Y> 40 : Moroutine<A, R>; 41 42function isAsyncGeneratorFunction(fn: Function): boolean { 43 return fn.constructor.name === 'AsyncGeneratorFunction'; 44} 45 46export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> { 47 const url = importMeta.url; 48 49 if (isModuleFrozen(url)) { 50 throw new Error( 51 `Cannot call mo() for ${url} after a task from this module has been dispatched. ` + 52 'All mo() calls must happen at module load time.', 53 ); 54 } 55 56 const index = counters.get(url) ?? 0; 57 counters.set(url, index + 1); 58 const id = `${url}#${index}`; 59 60 registry.set(id, fn); 61 62 const factory = isAsyncGeneratorFunction(fn) 63 ? (...args: unknown[]) => new AsyncIterableTask(id, args) 64 : (...args: unknown[]) => new PromiseLikeTask<R>(id, args); 65 Object.defineProperty(factory, 'id', { value: id, enumerable: true }); 66 return factory as unknown as MoReturn<A, R>; 67}