Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: StreamTask class and mo() async generator detection

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+73 -2
+1
src/index.ts
··· 1 1 export { mo } from './mo.ts'; 2 2 export type { Arg } from './mo.ts'; 3 3 export { Task } from './task.ts'; 4 + export { StreamTask } from './stream-task.ts'; 4 5 export { workers } from './worker-pool.ts'; 5 6 export { transfer } from './transfer.ts'; 6 7 export type { Runner } from './runner.ts';
+22 -2
src/mo.ts
··· 1 1 import { registry, isModuleFrozen } from './registry.ts'; 2 2 import { Task } from './task.ts'; 3 + import { StreamTask } from './stream-task.ts'; 3 4 4 5 const counters = new Map<string, number>(); 5 6 ··· 19 20 (...args: TaskableArgs<A>): Task<R>; 20 21 }; 21 22 22 - export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): Moroutine<A, R> { 23 + type StreamMoroutine<A extends unknown[], Y> = { 24 + (...args: A): StreamTask<Y>; 25 + (...args: TaskableArgs<A>): StreamTask<Y>; 26 + }; 27 + 28 + type IsNever<T> = [T] extends [never] ? true : false; 29 + type MoReturn<A extends unknown[], R> = IsNever<R> extends true 30 + ? Moroutine<A, R> 31 + : [R] extends [AsyncGenerator<infer Y, any, any>] 32 + ? StreamMoroutine<A, Y> 33 + : Moroutine<A, R>; 34 + 35 + function isAsyncGeneratorFunction(fn: Function): boolean { 36 + return fn.constructor.name === 'AsyncGeneratorFunction'; 37 + } 38 + 39 + export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): MoReturn<A, R> { 23 40 const url = importMeta.url; 24 41 25 42 if (isModuleFrozen(url)) { ··· 35 52 36 53 registry.set(id, fn); 37 54 38 - return ((...args: unknown[]) => new Task<R>(id, args)) as Moroutine<A, R>; 55 + if (isAsyncGeneratorFunction(fn)) { 56 + return ((...args: unknown[]) => new StreamTask(id, args)) as any; 57 + } 58 + return ((...args: unknown[]) => new Task<R>(id, args)) as any; 39 59 }
+14
src/stream-task.ts
··· 1 + let nextUid = 0; 2 + 3 + /** A deferred streaming computation. When dispatched, returns an AsyncIterable instead of a Promise. */ 4 + export class StreamTask<T> { 5 + readonly uid: number; 6 + readonly id: string; 7 + readonly args: unknown[]; 8 + 9 + constructor(id: string, args: unknown[]) { 10 + this.uid = nextUid++; 11 + this.id = id; 12 + this.args = args; 13 + } 14 + }
+36
test/stream-task.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { mo } from 'moroutine'; 4 + 5 + describe('StreamTask', () => { 6 + it('mo() with async function* returns StreamTask', async () => { 7 + const { StreamTask } = await import('../src/stream-task.ts'); 8 + const gen = mo(import.meta, async function* () { yield 1; }); 9 + const task = gen(); 10 + assert.ok(task instanceof StreamTask); 11 + }); 12 + 13 + it('mo() with regular function still returns Task', async () => { 14 + const { Task } = await import('../src/task.ts'); 15 + const { StreamTask } = await import('../src/stream-task.ts'); 16 + const fn = mo(import.meta, (x: number) => x * 2); 17 + const task = fn(2); 18 + assert.ok(task instanceof Task); 19 + assert.ok(!(task instanceof StreamTask)); 20 + }); 21 + 22 + it('StreamTask has uid, id, and args', async () => { 23 + const gen = mo(import.meta, async function* (n: number) { yield n; }); 24 + const task = gen(5); 25 + assert.equal(typeof task.id, 'string'); 26 + assert.deepEqual(task.args, [5]); 27 + assert.equal(typeof task.uid, 'number'); 28 + }); 29 + 30 + it('StreamTask uids are unique', async () => { 31 + const gen = mo(import.meta, async function* () { yield 1; }); 32 + const a = gen(); 33 + const b = gen(); 34 + assert.notEqual(a.uid, b.uid); 35 + }); 36 + });