Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(serve): make listen() take AsyncIterable; intern PushChannel

Move PushChannel/pushChannel out of the public moroutine API and into
src/serve/ as an internal helper. listen() and ListenArgs now accept the
general AsyncIterable<number> interface so moroutine can transparently
transfer the iterable cross-thread without coupling to a specific type.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+7 -9
-1
src/index.ts
··· 4 4 export { AsyncIterableTask } from './stream-task.ts'; 5 5 export { channel } from './channel.ts'; 6 6 export type { ChannelOptions } from './channel.ts'; 7 - export { pushChannel, PushChannel } from './push-channel.ts'; 8 7 export { workers } from './worker-pool.ts'; 9 8 export { transfer } from './transfer.ts'; 10 9 export { assign } from './assign.ts';
src/push-channel.ts src/serve/push-channel.ts
+2 -3
src/serve/listen.ts
··· 1 1 import { Socket } from 'node:net'; 2 2 import type { Server } from 'node:net'; 3 - import type { PushChannel } from '../push-channel.ts'; 4 3 import type { Tuple } from '../shared/tuple.ts'; 5 4 import type { Int32Atomic } from '../shared/int32-atomic.ts'; 6 5 import type { ListenOptions } from './types.ts'; ··· 14 13 */ 15 14 export async function listen( 16 15 server: Server, 17 - ch: PushChannel<number>, 16 + fds: AsyncIterable<number>, 18 17 counters: Tuple<Int32Atomic[]>, 19 18 slot: number, 20 19 opts: Required<ListenOptions>, ··· 30 29 } 31 30 }; 32 31 33 - for await (const fd of ch) { 32 + for await (const fd of fds) { 34 33 const socket = new Socket({ fd, readable: true, writable: true }); 35 34 socket.setNoDelay(true); 36 35 active.add(socket);
+1 -2
src/serve/types.ts
··· 1 - import type { PushChannel } from '../push-channel.ts'; 2 1 import type { Tuple } from '../shared/tuple.ts'; 3 2 import type { Int32Atomic } from '../shared/int32-atomic.ts'; 4 3 import type { WorkerHandle } from '../runner.ts'; ··· 14 13 * The shape may evolve; users only ever spread it into `listen()`. 15 14 */ 16 15 export type ListenArgs = readonly [ 17 - channel: PushChannel<number>, 16 + fds: AsyncIterable<number>, 18 17 counters: Tuple<Int32Atomic[]>, 19 18 slot: number, 20 19 opts: Required<ListenOptions>,
+4 -3
test/serve/listen.test.ts
··· 4 4 import { openSync } from 'node:fs'; 5 5 import { createServer as createNetServer, connect } from 'node:net'; 6 6 import { createServer } from 'node:http'; 7 - import { pushChannel as channel, shared, int32atomic } from 'moroutine'; 7 + import { shared, int32atomic } from 'moroutine'; 8 + import { pushChannel } from '../../src/serve/push-channel.ts'; 8 9 import { listen } from 'moroutine/serve'; 9 10 10 11 // Helper: accept a real TCP connection on a scratch listener and yield its fd. ··· 33 34 34 35 describe('listen()', () => { 35 36 it('emits received fds as connections on the user server', { timeout: 5000 }, async () => { 36 - const ch = channel<number>({ highWaterMark: 4 }); 37 + const ch = pushChannel<number>({ highWaterMark: 4 }); 37 38 const counters = shared([int32atomic]); 38 39 const http = createServer((req, res) => { 39 40 res.end('ok'); ··· 51 52 }); 52 53 53 54 it('decrements counter on socket close', { timeout: 5000 }, async () => { 54 - const ch = channel<number>({ highWaterMark: 4 }); 55 + const ch = pushChannel<number>({ highWaterMark: 4 }); 55 56 const counters = shared([int32atomic]); 56 57 counters.elements[0].store(1); // simulate main having incremented 57 58 const http = createServer((req, res) => {