Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: SharedStruct cross-worker serialization

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Devin Ivy 238db755 b1733ff5

+75 -6
+34 -5
src/shared/reconstruct.ts
··· 1 + import { SharedStruct } from './shared-struct.ts'; 2 + 1 3 const SHARED = Symbol.for('moroutine.shared'); 2 4 3 5 const registry = new Map<string, new (buffer: SharedArrayBuffer, byteOffset: number) => unknown>(); ··· 8 10 9 11 export function serializeArg(arg: unknown): unknown { 10 12 if (typeof arg === 'object' && arg !== null && SHARED in arg) { 11 - const shared = (arg as { [key: symbol]: () => { tag: string; buffer: SharedArrayBuffer; byteOffset: number } })[SHARED](); 12 - return { __shared__: shared.tag, buffer: shared.buffer, byteOffset: shared.byteOffset }; 13 + const data = (arg as any)[SHARED](); 14 + if (data.tag === 'SharedStruct') { 15 + const serializedFields: Record<string, unknown> = {}; 16 + for (const key in data.fields) { 17 + serializedFields[key] = serializeStructField(data.fields[key]); 18 + } 19 + return { __shared__: 'SharedStruct', fields: serializedFields }; 20 + } 21 + return { __shared__: data.tag, buffer: data.buffer, byteOffset: data.byteOffset }; 13 22 } 14 23 return arg; 15 24 } 16 25 26 + function serializeStructField(data: { tag: string; [key: string]: unknown }): unknown { 27 + if (data.tag === 'SharedStruct') { 28 + const serializedFields: Record<string, unknown> = {}; 29 + for (const key in data.fields as Record<string, unknown>) { 30 + serializedFields[key] = serializeStructField((data.fields as any)[key]); 31 + } 32 + return { __shared__: 'SharedStruct', fields: serializedFields }; 33 + } 34 + return { __shared__: data.tag, buffer: data.buffer, byteOffset: data.byteOffset }; 35 + } 36 + 17 37 export function deserializeArg(arg: unknown): unknown { 18 38 if (typeof arg === 'object' && arg !== null && '__shared__' in arg) { 19 - const data = arg as { __shared__: string; buffer: SharedArrayBuffer; byteOffset: number }; 20 - const ctor = registry.get(data.__shared__); 21 - if (ctor) return new ctor(data.buffer, data.byteOffset); 39 + const data = arg as { __shared__: string; [key: string]: unknown }; 40 + if (data.__shared__ === 'SharedStruct') { 41 + const fields = data.fields as Record<string, unknown>; 42 + const reconstructed: Record<string, unknown> = {}; 43 + for (const key in fields) { 44 + reconstructed[key] = deserializeArg(fields[key]); 45 + } 46 + return new SharedStruct(reconstructed as any); 47 + } 48 + const typedData = data as { __shared__: string; buffer: SharedArrayBuffer; byteOffset: number }; 49 + const ctor = registry.get(typedData.__shared__); 50 + if (ctor) return new ctor(typedData.buffer, typedData.byteOffset); 22 51 } 23 52 return arg; 24 53 }
+12
test/fixtures/shared-struct.ts
··· 1 + import { mo } from 'moroutine'; 2 + import type { SharedStruct, Int32 } from 'moroutine'; 3 + 4 + type Point = SharedStruct<{ x: Int32; y: Int32 }>; 5 + 6 + export const readPoint = mo(import.meta, (point: Point): { x: number; y: number } => { 7 + return point.load(); 8 + }); 9 + 10 + export const writePoint = mo(import.meta, (point: Point, x: number, y: number): void => { 11 + point.store({ x, y }); 12 + });
+29 -1
test/shared/cross-worker.test.ts
··· 1 1 import { describe, it } from 'node:test'; 2 2 import assert from 'node:assert/strict'; 3 - import { workerPool, AtomicInt32, Mutex, RwLock } from 'moroutine'; 3 + import { workerPool, AtomicInt32, Mutex, RwLock, SharedStruct, Int32, slab } from 'moroutine'; 4 4 import { atomicAdd, mutexIncrement, rwlockRead } from '../fixtures/sync.ts'; 5 + import { readPoint, writePoint } from '../fixtures/shared-struct.ts'; 5 6 6 7 describe('sync primitives across workers', () => { 7 8 it('AtomicInt32 is shared across worker threads', async () => { ··· 45 46 ]); 46 47 assert.equal(a, 42); 47 48 assert.equal(b, 42); 49 + } finally { 50 + pool[Symbol.dispose](); 51 + } 52 + }); 53 + 54 + it('SharedStruct is shared across worker threads', async () => { 55 + const [x, y] = slab(Int32, Int32); 56 + const point = new SharedStruct({ x, y }); 57 + point.store({ x: 1, y: 2 }); 58 + 59 + const pool = workerPool(1); 60 + try { 61 + const result = await pool(readPoint(point)); 62 + assert.deepEqual(result, { x: 1, y: 2 }); 63 + } finally { 64 + pool[Symbol.dispose](); 65 + } 66 + }); 67 + 68 + it('worker can write to SharedStruct', async () => { 69 + const [x, y] = slab(Int32, Int32); 70 + const point = new SharedStruct({ x, y }); 71 + 72 + const pool = workerPool(1); 73 + try { 74 + await pool(writePoint(point, 10, 20)); 75 + assert.deepEqual(point.load(), { x: 10, y: 20 }); 48 76 } finally { 49 77 pool[Symbol.dispose](); 50 78 }