Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Channel class with distributor for fan-out (work stealing)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+211 -23
+124 -9
src/channel.ts
··· 1 + import { MessageChannel, MessagePort } from 'node:worker_threads'; 2 + import type { Transferable } from 'node:worker_threads'; 3 + import { serializeArg } from './shared/reconstruct.ts'; 4 + import { collectTransferables } from './transfer.ts'; 5 + import { StreamTask } from './stream-task.ts'; 6 + import { runStreamOnDedicated } from './dedicated-runner.ts'; 7 + 1 8 const CHANNEL = Symbol.for('moroutine.channel'); 2 9 3 10 export interface ChannelOptions { 4 11 highWaterMark?: number; 5 12 } 6 13 7 - export interface ChannelMarker<T> { 8 - readonly [CHANNEL]: { 9 - iterable: AsyncIterable<T>; 10 - options?: ChannelOptions; 11 - }; 14 + const DEFAULT_HIGH_WATER = 16; 15 + 16 + interface Consumer { 17 + port: MessagePort; 18 + ready: boolean; 19 + } 20 + 21 + class Distributor<T> { 22 + private consumers: Consumer[] = []; 23 + private sourceIterator: AsyncIterator<T>; 24 + private done = false; 25 + private error: Error | null = null; 26 + private pulling = false; 27 + 28 + constructor(source: AsyncIterable<T>) { 29 + this.sourceIterator = source[Symbol.asyncIterator](); 30 + } 31 + 32 + addConsumer(): MessagePort { 33 + const { port1, port2 } = new MessageChannel(); 34 + port1.unref(); 35 + 36 + const consumer: Consumer = { port: port1, ready: true }; 37 + 38 + port1.on('message', (signal: string) => { 39 + if (signal === 'pause') { 40 + consumer.ready = false; 41 + } else if (signal === 'resume') { 42 + consumer.ready = true; 43 + this.tryPull(); 44 + } 45 + }); 46 + 47 + port1.on('close', () => { 48 + this.consumers = this.consumers.filter((c) => c !== consumer); 49 + }); 50 + 51 + this.consumers.push(consumer); 52 + 53 + if (this.done) { 54 + if (this.error) { 55 + port1.postMessage({ done: true, error: this.error.message }); 56 + } else { 57 + port1.postMessage({ done: true }); 58 + } 59 + try { port1.close(); } catch {} 60 + } else { 61 + this.tryPull(); 62 + } 63 + 64 + return port2; 65 + } 66 + 67 + private async tryPull(): Promise<void> { 68 + if (this.pulling || this.done) return; 69 + this.pulling = true; 70 + 71 + try { 72 + while (!this.done) { 73 + const readyConsumer = this.consumers.find((c) => c.ready); 74 + if (!readyConsumer) break; // no ready consumers, pause pulling 75 + 76 + const result = await this.sourceIterator.next(); 77 + if (result.done) { 78 + this.done = true; 79 + for (const c of this.consumers) { 80 + c.port.postMessage({ done: true }); 81 + try { c.port.close(); } catch {} 82 + } 83 + break; 84 + } 85 + 86 + const serialized = serializeArg(result.value); 87 + const transferList: Transferable[] = []; 88 + collectTransferables(result.value, transferList); 89 + readyConsumer.port.postMessage({ value: serialized, done: false }, transferList as any[]); 90 + 91 + // After sending, yield to event loop so pause signals can arrive 92 + await new Promise((r) => setImmediate(r)); 93 + } 94 + } catch (err) { 95 + this.done = true; 96 + this.error = err instanceof Error ? err : new Error(String(err)); 97 + for (const c of this.consumers) { 98 + c.port.postMessage({ done: true, error: this.error.message }); 99 + try { c.port.close(); } catch {} 100 + } 101 + } 102 + 103 + this.pulling = false; 104 + } 105 + } 106 + 107 + export class Channel<T> { 108 + private distributor: Distributor<T> | null = null; 109 + private source: AsyncIterable<T>; 110 + readonly [CHANNEL] = true; 111 + 112 + constructor(source: AsyncIterable<T>) { 113 + this.source = source; 114 + } 115 + 116 + addConsumer(): MessagePort { 117 + if (!this.distributor) { 118 + let source: AsyncIterable<T> = this.source; 119 + if (source instanceof StreamTask) { 120 + source = runStreamOnDedicated((source as any).id, (source as any).args) as AsyncIterable<T>; 121 + } 122 + this.distributor = new Distributor<T>(source); 123 + } 124 + return this.distributor.addConsumer(); 125 + } 12 126 } 13 127 14 128 /** 15 - * Wraps an AsyncIterable for streaming to a worker. 16 - * @param iterable - The async iterable to pipe to the worker. 129 + * Creates a channel for streaming values to workers. Supports fan-out when 130 + * the same channel is passed to multiple tasks. 131 + * @param iterable - The async iterable or StreamTask to distribute. 17 132 * @param opts - Optional. highWaterMark controls backpressure buffering (default: 16). 18 - * @returns The iterable, typed as AsyncIterable<T> for transparent moroutine arg use. 133 + * @returns A Channel, typed as AsyncIterable<T> for transparent moroutine arg use. 19 134 */ 20 135 export function channel<T>(iterable: AsyncIterable<T>, opts?: ChannelOptions): AsyncIterable<T> { 21 - return { [CHANNEL]: { iterable, options: opts } } as unknown as AsyncIterable<T>; 136 + return new Channel(iterable) as unknown as AsyncIterable<T>; 22 137 } 23 138 24 139 export { CHANNEL };
+4 -14
src/execute.ts
··· 7 7 import { Task } from './task.ts'; 8 8 import { StreamTask } from './stream-task.ts'; 9 9 import { runStreamOnDedicated } from './dedicated-runner.ts'; 10 - import { CHANNEL } from './channel.ts'; 10 + import { CHANNEL, Channel } from './channel.ts'; 11 11 import type { ChannelOptions } from './channel.ts'; 12 12 13 13 let nextCallId = 0; ··· 93 93 streamPortStack[streamPortStack.length - 1].push(port2); 94 94 return port2; 95 95 } 96 - if (typeof arg === 'object' && arg !== null && CHANNEL in arg) { 97 - const data = (arg as any)[CHANNEL]; 98 - let iterable = data.iterable; 99 - const highWater = data.options?.highWaterMark ?? DEFAULT_HIGH_WATER; 100 - 101 - // If the iterable is a StreamTask, dispatch it to its dedicated worker first 102 - if (iterable instanceof StreamTask) { 103 - iterable = runStreamOnDedicated(iterable.id, iterable.args); 104 - } 105 - 106 - const { port1, port2 } = new MessageChannel(); 107 - port1.unref(); 108 - pipeToPort(iterable, port1, highWater); 96 + // Channel wrapper — supports fan-out via shared distributor 97 + if (arg instanceof Channel) { 98 + const port2 = arg.addConsumer(); 109 99 streamPortStack[streamPortStack.length - 1].push(port2); 110 100 return port2; 111 101 }
+72
test/channel-fanout.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers, channel } from 'moroutine'; 4 + import { collectItems, generate } from './fixtures/channel-fanout.ts'; 5 + 6 + describe('channel fan-out', () => { 7 + it('distributes items across multiple consumers', async () => { 8 + const data = channel(generate(20)); 9 + const run = workers(4); 10 + try { 11 + const results: any[] = await run([ 12 + collectItems(data), 13 + collectItems(data), 14 + collectItems(data), 15 + collectItems(data), 16 + ]); 17 + const all = [...results[0], ...results[1], ...results[2], ...results[3]].sort((x: number, y: number) => x - y); 18 + assert.deepEqual(all, Array.from({ length: 20 }, (_, i) => i)); 19 + } finally { 20 + run[Symbol.dispose](); 21 + } 22 + }); 23 + 24 + it('each item goes to exactly one consumer', async () => { 25 + const data = channel(generate(100)); 26 + const run = workers(2); 27 + try { 28 + const results: any[] = await run([ 29 + collectItems(data), 30 + collectItems(data), 31 + ]); 32 + const a: number[] = results[0]; 33 + const b: number[] = results[1]; 34 + const setA = new Set(a); 35 + for (const item of b) { 36 + assert.ok(!setA.has(item), `Item ${item} appeared in both consumers`); 37 + } 38 + assert.equal(a.length + b.length, 100); 39 + } finally { 40 + run[Symbol.dispose](); 41 + } 42 + }); 43 + 44 + it('single consumer via channel() still works', async () => { 45 + async function* numbers() { yield 1; yield 2; yield 3; } 46 + const run = workers(1); 47 + try { 48 + const result = await run(collectItems(channel(numbers()))); 49 + assert.deepEqual(result, [1, 2, 3]); 50 + } finally { 51 + run[Symbol.dispose](); 52 + } 53 + }); 54 + 55 + it('channel with local AsyncIterable fan-out', async () => { 56 + async function* localGen() { 57 + for (let i = 0; i < 10; i++) yield i; 58 + } 59 + const data = channel(localGen()); 60 + const run = workers(2); 61 + try { 62 + const results: any[] = await run([ 63 + collectItems(data), 64 + collectItems(data), 65 + ]); 66 + const all = [...results[0], ...results[1]].sort((x: number, y: number) => x - y); 67 + assert.deepEqual(all, Array.from({ length: 10 }, (_, i) => i)); 68 + } finally { 69 + run[Symbol.dispose](); 70 + } 71 + }); 72 + });
+11
test/fixtures/channel-fanout.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const collectItems = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => { 4 + const items: number[] = []; 5 + for await (const n of input) items.push(n); 6 + return items; 7 + }); 8 + 9 + export const generate = mo(import.meta, async function* (n: number) { 10 + for (let i = 0; i < n; i++) yield i; 11 + });