Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: task-arg resolution and caching on workers

When a Task is passed as an argument to another moroutine, the worker
resolves it on first encounter and caches the result by task uid.
Subsequent calls with the same task reuse the cached value. This enables
worker-local context patterns — the resolved value never crosses thread
boundaries, so it can hold complex non-serializable state like database
connections or compiled indexes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Devin Ivy 981086bc a9f38f47

+138 -5
+9 -1
src/execute.ts
··· 2 2 import { freezeModule } from './registry.ts'; 3 3 import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 4 4 import { extractTransferables } from './transfer.ts'; 5 + import { Task } from './task.ts'; 5 6 6 7 let nextCallId = 0; 7 8 const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); ··· 19 20 }); 20 21 } 21 22 23 + function prepareArg(arg: unknown): unknown { 24 + if (arg instanceof Task) { 25 + return { __task__: arg.uid, id: arg.id, args: arg.args.map(prepareArg) }; 26 + } 27 + return serializeArg(arg); 28 + } 29 + 22 30 export function execute<T>(worker: Worker, id: string, args: unknown[]): Promise<T> { 23 31 const url = id.slice(0, id.lastIndexOf('#')); 24 32 freezeModule(url); ··· 26 34 return new Promise<T>((resolve, reject) => { 27 35 pending.set(callId, { resolve, reject }); 28 36 const extracted = extractTransferables(args); 29 - const msg = { callId, id, args: extracted.args.map(serializeArg) }; 37 + const msg = { callId, id, args: extracted.args.map(prepareArg) }; 30 38 worker.postMessage(msg, extracted.transfer); 31 39 }); 32 40 }
+7 -2
src/mo.ts
··· 9 9 * @param fn - The function to offload to a worker thread. 10 10 * @returns A function that creates a {@link Task} when called. 11 11 */ 12 - export function mo<A extends unknown[], R>(importMeta: ImportMeta, fn: (...args: A) => R): (...args: A) => Task<R> { 12 + type TaskableArgs<A extends unknown[]> = { [K in keyof A]: A[K] | Task<A[K]> }; 13 + 14 + export function mo<A extends unknown[], R>( 15 + importMeta: ImportMeta, 16 + fn: (...args: A) => R, 17 + ): (...args: TaskableArgs<A>) => Task<R> { 13 18 const url = importMeta.url; 14 19 15 20 if (isModuleFrozen(url)) { ··· 25 30 26 31 registry.set(id, fn); 27 32 28 - return (...args: A) => new Task<R>(id, args); 33 + return ((...args: unknown[]) => new Task<R>(id, args)) as (...args: TaskableArgs<A>) => Task<R>; 29 34 }
+4
src/task.ts
··· 1 1 import { runOnDedicated } from './dedicated-runner.ts'; 2 2 3 + let nextUid = 0; 4 + 3 5 /** A deferred computation that runs on a worker thread when awaited. */ 4 6 export class Task<T> { 7 + readonly uid: number; 5 8 readonly id: string; 6 9 readonly args: unknown[]; 7 10 8 11 constructor(id: string, args: unknown[]) { 12 + this.uid = nextUid++; 9 13 this.id = id; 10 14 this.args = args; 11 15 }
+29 -2
src/worker-entry.ts
··· 5 5 import type { Transferable } from 'node:worker_threads'; 6 6 7 7 const imported = new Set<string>(); 8 + const taskCache = new Map<number, unknown>(); 9 + 10 + function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } { 11 + return typeof arg === 'object' && arg !== null && '__task__' in arg; 12 + } 13 + 14 + async function resolveArg(arg: unknown): Promise<unknown> { 15 + if (isTaskArg(arg)) { 16 + if (taskCache.has(arg.__task__)) { 17 + return taskCache.get(arg.__task__); 18 + } 19 + // Resolve the task's own args recursively 20 + const resolvedArgs = await Promise.all(arg.args.map(resolveArg)); 21 + // Import the module and run the function 22 + const url = arg.id.slice(0, arg.id.lastIndexOf('#')); 23 + if (!imported.has(url)) { 24 + await import(url); 25 + imported.add(url); 26 + } 27 + const fn = registry.get(arg.id); 28 + if (!fn) throw new Error(`Moroutine not found: ${arg.id}`); 29 + const value = await fn(...resolvedArgs); 30 + taskCache.set(arg.__task__, value); 31 + return value; 32 + } 33 + return deserializeArg(arg); 34 + } 8 35 9 36 parentPort!.on('message', async (msg: { callId: number; id: string; args: unknown[] }) => { 10 37 const { callId, id, args } = msg; ··· 18 45 const fn = registry.get(id); 19 46 if (!fn) throw new Error(`Moroutine not found: ${id}`); 20 47 21 - const deserializedArgs = args.map(deserializeArg); 22 - const value = await fn(...deserializedArgs); 48 + const resolvedArgs = await Promise.all(args.map(resolveArg)); 49 + const value = await fn(...resolvedArgs); 23 50 const returnValue = serializeArg(value); 24 51 const transferList: Transferable[] = []; 25 52 collectTransferables(value, transferList);
+68
test/context.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers } from 'moroutine'; 4 + import { makeCtx, identity, readCtx, addWithCtx } from './fixtures/context.ts'; 5 + 6 + describe('task-arg caching', () => { 7 + it('task arg is resolved on the worker', async () => { 8 + const ctx = makeCtx('hello'); 9 + const run = workers(1); 10 + try { 11 + const result = await run(readCtx(ctx)); 12 + assert.equal(result.value, 'hello'); 13 + } finally { 14 + run[Symbol.dispose](); 15 + } 16 + }); 17 + 18 + it('task arg is cached — same task evaluated once per worker', async () => { 19 + const ctx = makeCtx('test'); 20 + const run = workers(1); 21 + try { 22 + const r1 = await run(readCtx(ctx)); 23 + const r2 = await run(readCtx(ctx)); 24 + // Same workerInitId means ctx was only evaluated once 25 + assert.equal(r1.workerInitId, r2.workerInitId); 26 + assert.equal(r1.value, 'test'); 27 + } finally { 28 + run[Symbol.dispose](); 29 + } 30 + }); 31 + 32 + it('different workers each evaluate the task independently', async () => { 33 + const ctx = makeCtx('test'); 34 + const run = workers(2); 35 + try { 36 + const r1 = await run(readCtx(ctx)); 37 + const r2 = await run(readCtx(ctx)); 38 + // Each worker's initCount starts at 0, so both get workerInitId=1 39 + assert.equal(r1.workerInitId, 1); 40 + assert.equal(r2.workerInitId, 1); 41 + } finally { 42 + run[Symbol.dispose](); 43 + } 44 + }); 45 + 46 + it('task arg works alongside regular args', async () => { 47 + const ctx = identity({ multiplier: 3 }); 48 + const run = workers(1); 49 + try { 50 + const result = await run(addWithCtx(ctx, 2, 5)); 51 + assert.equal(result, 21); // (2 + 5) * 3 52 + } finally { 53 + run[Symbol.dispose](); 54 + } 55 + }); 56 + 57 + it('moroutine task arg resolves and caches', async () => { 58 + const ctx = makeCtx('nested'); 59 + const run = workers(1); 60 + try { 61 + const result = await run(readCtx(ctx)); 62 + assert.equal(result.value, 'nested'); 63 + assert.equal(typeof result.workerInitId, 'number'); 64 + } finally { 65 + run[Symbol.dispose](); 66 + } 67 + }); 68 + });
+21
test/fixtures/context.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + let initCount = 0; 4 + 5 + export const makeCtx = mo(import.meta, (value: string): { value: string; workerInitId: number } => { 6 + initCount++; 7 + return { value, workerInitId: initCount }; 8 + }); 9 + 10 + export const identity = mo(import.meta, <T>(value: T): T => value); 11 + 12 + export const readCtx = mo( 13 + import.meta, 14 + (ctx: { value: string; workerInitId: number }): { value: string; workerInitId: number } => { 15 + return ctx; 16 + }, 17 + ); 18 + 19 + export const addWithCtx = mo(import.meta, (ctx: { multiplier: number }, a: number, b: number): number => { 20 + return (a + b) * ctx.multiplier; 21 + });