Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e2dabc1cc0996ad4f54664323123e1ec632e02ea 61 lines 2.0 kB view raw
1import { registry, isModuleFrozen } from './registry.ts'; 2import { Task } from './task.ts'; 3import { StreamTask } from './stream-task.ts'; 4 5const counters = new Map<string, number>(); 6 7/** 8 * Wraps a function to run on a worker thread. Must be called at module scope. 9 * @param importMeta - The `import.meta` of the calling module, used to identify the source file. 10 * @param fn - The function to offload to a worker thread. 11 * @returns A callable that creates a {@link Task} (or {@link StreamTask} for `async function*`) when invoked. 12 */ 13 14/** A value or a Task that resolves to that value on the worker. */ 15export type Arg<T> = T | Task<T>; 16 17type TaskableArgs<A extends unknown[]> = { [K in keyof A]: Arg<A[K]> }; 18 19type Moroutine<A extends unknown[], R> = { 20 (...args: A): Task<Awaited<R>>; 21 (...args: TaskableArgs<A>): Task<Awaited<R>>; 22}; 23 24type StreamMoroutine<A extends unknown[], Y> = { 25 (...args: A): StreamTask<Y>; 26 (...args: TaskableArgs<A>): StreamTask<Y>; 27}; 28 29type IsNever<T> = [T] extends [never] ? true : false; 30type MoReturn<A extends unknown[], R> = 31 IsNever<R> extends true 32 ? Moroutine<A, R> 33 : [R] extends [AsyncGenerator<infer Y, any, any>] 34 ? StreamMoroutine<A, Y> 35 : Moroutine<A, R>; 36 37function isAsyncGeneratorFunction(fn: Function): boolean { 38 return fn.constructor.name === 'AsyncGeneratorFunction'; 39} 40 41export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> { 42 const url = importMeta.url; 43 44 if (isModuleFrozen(url)) { 45 throw new Error( 46 `Cannot call mo() for ${url} after a task from this module has been dispatched. ` + 47 'All mo() calls must happen at module load time.', 48 ); 49 } 50 51 const index = counters.get(url) ?? 0; 52 counters.set(url, index + 1); 53 const id = `${url}#${index}`; 54 55 registry.set(id, fn); 56 57 if (isAsyncGeneratorFunction(fn)) { 58 return ((...args: unknown[]) => new StreamTask(id, args)) as any; 59 } 60 return ((...args: unknown[]) => new Task<R>(id, args)) as any; 61}