Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: auto-detect and transfer AbortSignal arguments to workers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+80
+7
.changeset/abort-signal-transfer.md
··· 1 + --- 2 + "moroutine": minor 3 + --- 4 + 5 + Auto-detect and transfer AbortSignal arguments to workers 6 + 7 + AbortSignal args are automatically detected, marked transferable via `util.transferableAbortSignal()`, and transferred to the worker. Works with regular tasks, streaming moroutines, and dedicated workers.
+7
src/execute.ts
··· 1 1 import type { Worker } from 'node:worker_threads'; 2 2 import { MessageChannel } from 'node:worker_threads'; 3 3 import type { MessagePort, Transferable } from 'node:worker_threads'; 4 + import { transferableAbortSignal } from 'node:util'; 4 5 import { freezeModule } from './registry.ts'; 5 6 import { serializeArg, deserializeArg } from './shared/reconstruct.ts'; 6 7 import { extractTransferables, collectTransferables } from './transfer.ts'; ··· 90 91 } 91 92 92 93 function prepareArg(arg: unknown): unknown { 94 + // Auto-detect AbortSignal args — mark transferable and include in transfer list 95 + if (arg instanceof AbortSignal) { 96 + const signal = transferableAbortSignal(arg); 97 + streamPortStack[streamPortStack.length - 1].push(signal as unknown as MessagePort); 98 + return signal; 99 + } 93 100 // Auto-detect AsyncGenerator args — pipe via MessageChannel 94 101 if (isAsyncGenerator(arg)) { 95 102 const { port1, port2 } = new MessageChannel();
+43
test/abort-signal.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers } from 'moroutine'; 4 + import { waitForAbort, collectUntilAbort } from './fixtures/abort-signal.ts'; 5 + 6 + describe('AbortSignal transfer', () => { 7 + it('transfers AbortSignal to a worker task', async () => { 8 + const ac = new AbortController(); 9 + using run = workers(1); 10 + const promise = run(waitForAbort(ac.signal)); 11 + setTimeout(() => ac.abort(), 50); 12 + const result = await promise; 13 + assert.equal(result, 'aborted'); 14 + }); 15 + 16 + it('transfers already-aborted signal', async () => { 17 + const ac = new AbortController(); 18 + ac.abort(); 19 + using run = workers(1); 20 + const result = await run(waitForAbort(ac.signal)); 21 + assert.equal(result, 'already-aborted'); 22 + }); 23 + 24 + it('transfers AbortSignal to a streaming moroutine', async () => { 25 + const ac = new AbortController(); 26 + using run = workers(1); 27 + const results: number[] = []; 28 + setTimeout(() => ac.abort(), 80); 29 + for await (const n of run(collectUntilAbort(ac.signal))) { 30 + results.push(n); 31 + } 32 + assert.ok(results.length >= 2, `expected at least 2 items, got ${results.length}`); 33 + assert.equal(results[results.length - 1], -1, 'last item should be -1 sentinel'); 34 + }); 35 + 36 + it('transfers AbortSignal on dedicated worker', async () => { 37 + const ac = new AbortController(); 38 + const promise = waitForAbort(ac.signal); 39 + setTimeout(() => ac.abort(), 50); 40 + const result = await promise; 41 + assert.equal(result, 'aborted'); 42 + }); 43 + });
+23
test/fixtures/abort-signal.ts
··· 1 + import { setTimeout } from 'node:timers/promises'; 2 + import { mo } from 'moroutine'; 3 + 4 + export const waitForAbort = mo(import.meta, (signal: AbortSignal): Promise<string> => { 5 + return new Promise((resolve) => { 6 + if (signal.aborted) { 7 + resolve('already-aborted'); 8 + return; 9 + } 10 + signal.addEventListener('abort', () => { 11 + resolve('aborted'); 12 + }); 13 + }); 14 + }); 15 + 16 + export const collectUntilAbort = mo(import.meta, async function* (signal: AbortSignal) { 17 + let i = 0; 18 + while (!signal.aborted) { 19 + yield i++; 20 + await setTimeout(10); 21 + } 22 + yield -1; // sentinel indicating abort was seen 23 + });