Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: transfer() for zero-copy ArrayBuffer, TypedArray, stream, and port passing

Supports all built-in transferable types: ArrayBuffer, MessagePort,
ReadableStream, WritableStream, TransformStream, TypedArray, DataView.
Extracts the appropriate transferable (e.g. .buffer for TypedArrays)
and passes it via the postMessage transfer list.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+133 -2
+4 -1
src/execute.ts
··· 1 1 import type { Worker } from 'node:worker_threads'; 2 2 import { freezeModule } from './registry.ts'; 3 3 import { serializeArg, deserializeArg } from './sync/reconstruct.ts'; 4 + import { extractTransferables } from './transfer.ts'; 4 5 5 6 let nextCallId = 0; 6 7 const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); ··· 24 25 const callId = nextCallId++; 25 26 return new Promise<T>((resolve, reject) => { 26 27 pending.set(callId, { resolve, reject }); 27 - worker.postMessage({ callId, id, args: args.map(serializeArg) }); 28 + const extracted = extractTransferables(args); 29 + const msg = { callId, id, args: extracted.args.map(serializeArg) }; 30 + worker.postMessage(msg, extracted.transfer); 28 31 }); 29 32 }
+1
src/index.ts
··· 1 1 export { mo } from './mo.ts'; 2 2 export { Task } from './task.ts'; 3 3 export { workerPool } from './worker-pool.ts'; 4 + export { transfer } from './transfer.ts'; 4 5 export type { Runner } from './runner.ts'; 5 6 export { 6 7 AtomicBool,
+53
src/transfer.ts
··· 1 + import type { Transferable } from 'node:worker_threads'; 2 + import { MessagePort } from 'node:worker_threads'; 3 + 4 + const TRANSFER = Symbol.for('moroutine.transfer'); 5 + 6 + export interface Transferred<T> { 7 + readonly [TRANSFER]: true; 8 + readonly value: T; 9 + } 10 + 11 + export function transfer<T>(value: T): Transferred<T> { 12 + return { [TRANSFER]: true as const, value }; 13 + } 14 + 15 + export function extractTransferables(args: unknown[]): { args: unknown[]; transfer: Transferable[] } { 16 + const transferList: Transferable[] = []; 17 + const processedArgs = args.map((arg) => { 18 + if (typeof arg === 'object' && arg !== null && TRANSFER in arg) { 19 + const value = (arg as Transferred<unknown>).value; 20 + collectTransferables(value, transferList); 21 + return value; 22 + } 23 + return arg; 24 + }); 25 + return { args: processedArgs, transfer: transferList }; 26 + } 27 + 28 + function collectTransferables(value: unknown, list: Transferable[]): void { 29 + if (!(typeof value === 'object' && value !== null)) return; 30 + 31 + // Directly transferable 32 + if ( 33 + value instanceof ArrayBuffer || 34 + value instanceof MessagePort || 35 + value instanceof ReadableStream || 36 + value instanceof WritableStream 37 + ) { 38 + list.push(value); 39 + return; 40 + } 41 + 42 + // TypedArray or DataView — extract .buffer 43 + if (ArrayBuffer.isView(value) && value.buffer instanceof ArrayBuffer) { 44 + list.push(value.buffer); 45 + return; 46 + } 47 + 48 + // TransformStream — extract both streams 49 + if (value instanceof TransformStream) { 50 + list.push(value.readable, value.writable); 51 + return; 52 + } 53 + }
+4 -1
src/worker-entry.ts
··· 1 1 import { parentPort } from 'node:worker_threads'; 2 2 import { registry } from './registry.ts'; 3 3 import { deserializeArg, serializeArg } from './sync/reconstruct.ts'; 4 + import { extractTransferables } from './transfer.ts'; 4 5 5 6 const imported = new Set<string>(); 6 7 ··· 18 19 19 20 const deserializedArgs = args.map(deserializeArg); 20 21 const value = await fn(...deserializedArgs); 21 - parentPort!.postMessage({ callId, value: serializeArg(value) }); 22 + const extracted = extractTransferables([value]); 23 + const returnValue = serializeArg(extracted.args[0]); 24 + parentPort!.postMessage({ callId, value: returnValue }, extracted.transfer); 22 25 } catch (err) { 23 26 const message = err instanceof Error ? err.message : String(err); 24 27 parentPort!.postMessage({ callId, error: message });
+18
test/fixtures/transfer.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const sumBuffer = mo(import.meta, (buf: ArrayBuffer): number => { 4 + const view = new Uint8Array(buf); 5 + let sum = 0; 6 + for (let i = 0; i < view.length; i++) { 7 + sum += view[i]; 8 + } 9 + return sum; 10 + }); 11 + 12 + export const sumUint8 = mo(import.meta, (arr: Uint8Array): number => { 13 + let sum = 0; 14 + for (let i = 0; i < arr.length; i++) { 15 + sum += arr[i]; 16 + } 17 + return sum; 18 + });
+53
test/transfer.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workerPool, transfer } from 'moroutine'; 4 + import { sumBuffer, sumUint8 } from './fixtures/transfer.ts'; 5 + 6 + describe('transfer', () => { 7 + it('transfers an ArrayBuffer to a worker (zero-copy)', async () => { 8 + const buf = new ArrayBuffer(4); 9 + const view = new Uint8Array(buf); 10 + view[0] = 1; view[1] = 2; view[2] = 3; view[3] = 4; 11 + 12 + const pool = workerPool(1); 13 + try { 14 + const sum = await pool(sumBuffer(transfer(buf))); 15 + assert.equal(sum, 10); 16 + // Original buffer should be detached after transfer 17 + assert.equal(buf.byteLength, 0); 18 + } finally { 19 + pool[Symbol.dispose](); 20 + } 21 + }); 22 + 23 + it('transfers a Uint8Array to a worker via its buffer', async () => { 24 + const arr = new Uint8Array([5, 10, 15, 20]); 25 + const originalBuffer = arr.buffer; 26 + 27 + const pool = workerPool(1); 28 + try { 29 + const sum = await pool(sumUint8(transfer(arr))); 30 + assert.equal(sum, 50); 31 + // Underlying buffer should be detached after transfer 32 + assert.equal(originalBuffer.byteLength, 0); 33 + } finally { 34 + pool[Symbol.dispose](); 35 + } 36 + }); 37 + 38 + it('without transfer, buffer is copied (not detached)', async () => { 39 + const buf = new ArrayBuffer(4); 40 + const view = new Uint8Array(buf); 41 + view[0] = 1; view[1] = 2; view[2] = 3; view[3] = 4; 42 + 43 + const pool = workerPool(1); 44 + try { 45 + const sum = await pool(sumBuffer(buf)); 46 + assert.equal(sum, 10); 47 + // Original buffer should NOT be detached (it was copied) 48 + assert.equal(buf.byteLength, 4); 49 + } finally { 50 + pool[Symbol.dispose](); 51 + } 52 + }); 53 + });