Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: sync primitive serialization protocol for cross-worker transfer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Devin Ivy 70ca715c 98dd6fbd

+119 -4
+3 -2
src/execute.ts
··· 1 1 import type { Worker } from 'node:worker_threads'; 2 2 import { freezeModule } from './registry.ts'; 3 + import { serializeArg, deserializeArg } from './sync/reconstruct.ts'; 3 4 4 5 let nextCallId = 0; 5 6 const pending = new Map<number, { resolve: (value: any) => void; reject: (reason: any) => void }>(); ··· 12 13 if (msg.error !== undefined) { 13 14 call.reject(new Error(msg.error)); 14 15 } else { 15 - call.resolve(msg.value); 16 + call.resolve(deserializeArg(msg.value)); 16 17 } 17 18 }); 18 19 } ··· 23 24 const callId = nextCallId++; 24 25 return new Promise<T>((resolve, reject) => { 25 26 pending.set(callId, { resolve, reject }); 26 - worker.postMessage({ callId, id, args }); 27 + worker.postMessage({ callId, id, args: args.map(serializeArg) }); 27 28 }); 28 29 }
+8
src/sync/atomic-bool.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicBool { 2 4 static readonly byteSize = 1; 3 5 static readonly byteAlignment = 1; ··· 36 38 compareExchange(expected: boolean, replacement: boolean): boolean { 37 39 return Atomics.compareExchange(this.view, 0, expected ? 1 : 0, replacement ? 1 : 0) !== 0; 38 40 } 41 + 42 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 43 + return { tag: 'AtomicBool', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 44 + } 39 45 } 46 + 47 + registerSync('AtomicBool', AtomicBool);
+8
src/sync/atomic-int16.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicInt16 { 2 4 static readonly byteSize = 2; 3 5 static readonly byteAlignment = 2; ··· 44 46 compareExchange(expected: number, replacement: number): number { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicInt16', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicInt16', AtomicInt16);
+8
src/sync/atomic-int32.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicInt32 { 2 4 static readonly byteSize = 4; 3 5 static readonly byteAlignment = 4; ··· 44 46 compareExchange(expected: number, replacement: number): number { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicInt32', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicInt32', AtomicInt32);
+8
src/sync/atomic-int64.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicInt64 { 2 4 static readonly byteSize = 8; 3 5 static readonly byteAlignment = 8; ··· 44 46 compareExchange(expected: bigint, replacement: bigint): bigint { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicInt64', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicInt64', AtomicInt64);
+8
src/sync/atomic-int8.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicInt8 { 2 4 static readonly byteSize = 1; 3 5 static readonly byteAlignment = 1; ··· 44 46 compareExchange(expected: number, replacement: number): number { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicInt8', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicInt8', AtomicInt8);
+8
src/sync/atomic-uint16.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicUint16 { 2 4 static readonly byteSize = 2; 3 5 static readonly byteAlignment = 2; ··· 44 46 compareExchange(expected: number, replacement: number): number { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicUint16', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicUint16', AtomicUint16);
+8
src/sync/atomic-uint32.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicUint32 { 2 4 static readonly byteSize = 4; 3 5 static readonly byteAlignment = 4; ··· 44 46 compareExchange(expected: number, replacement: number): number { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicUint32', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicUint32', AtomicUint32);
+8
src/sync/atomic-uint64.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicUint64 { 2 4 static readonly byteSize = 8; 3 5 static readonly byteAlignment = 8; ··· 44 46 compareExchange(expected: bigint, replacement: bigint): bigint { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicUint64', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicUint64', AtomicUint64);
+8
src/sync/atomic-uint8.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 export class AtomicUint8 { 2 4 static readonly byteSize = 1; 3 5 static readonly byteAlignment = 1; ··· 44 46 compareExchange(expected: number, replacement: number): number { 45 47 return Atomics.compareExchange(this.view, 0, expected, replacement); 46 48 } 49 + 50 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 51 + return { tag: 'AtomicUint8', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 52 + } 47 53 } 54 + 55 + registerSync('AtomicUint8', AtomicUint8);
+8
src/sync/mutex.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 const UNLOCKED = 0; 2 4 const LOCKED = 1; 3 5 ··· 42 44 Atomics.store(this.view, 0, UNLOCKED); 43 45 Atomics.notify(this.view, 0, 1); 44 46 } 47 + 48 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 49 + return { tag: 'Mutex', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 50 + } 45 51 } 52 + 53 + registerSync('Mutex', Mutex);
+24
src/sync/reconstruct.ts
··· 1 + const SYNC = Symbol.for('moroutine.sync'); 2 + 3 + const registry = new Map<string, new (buffer: SharedArrayBuffer, byteOffset: number) => any>(); 4 + 5 + export function registerSync(tag: string, ctor: new (buffer: SharedArrayBuffer, byteOffset: number) => any): void { 6 + registry.set(tag, ctor); 7 + } 8 + 9 + export function serializeArg(arg: unknown): unknown { 10 + if (typeof arg === 'object' && arg !== null && SYNC in arg) { 11 + const sync = (arg as any)[SYNC](); 12 + return { __sync__: sync.tag, buffer: sync.buffer, byteOffset: sync.byteOffset }; 13 + } 14 + return arg; 15 + } 16 + 17 + export function deserializeArg(arg: unknown): unknown { 18 + if (typeof arg === 'object' && arg !== null && '__sync__' in arg) { 19 + const data = arg as { __sync__: string; buffer: SharedArrayBuffer; byteOffset: number }; 20 + const ctor = registry.get(data.__sync__); 21 + if (ctor) return new ctor(data.buffer, data.byteOffset); 22 + } 23 + return arg; 24 + }
+8
src/sync/rwlock.ts
··· 1 + import { registerSync } from './reconstruct.ts'; 2 + 1 3 const UNLOCKED = 0; 2 4 const WRITE_LOCKED = -1; 3 5 ··· 84 86 // Wake all waiters — both readers and writers may be waiting 85 87 Atomics.notify(this.view, 0, +Infinity); 86 88 } 89 + 90 + [Symbol.for('moroutine.sync')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } { 91 + return { tag: 'RwLock', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset }; 92 + } 87 93 } 94 + 95 + registerSync('RwLock', RwLock);
+4 -2
src/worker-entry.ts
··· 1 1 import { parentPort } from 'node:worker_threads'; 2 2 import { registry } from './registry.ts'; 3 + import { deserializeArg, serializeArg } from './sync/reconstruct.ts'; 3 4 4 5 const imported = new Set<string>(); 5 6 ··· 15 16 const fn = registry.get(id); 16 17 if (!fn) throw new Error(`Moroutine not found: ${id}`); 17 18 18 - const value = await fn(...args); 19 - parentPort!.postMessage({ callId, value }); 19 + const deserializedArgs = args.map(deserializeArg); 20 + const value = await fn(...deserializedArgs); 21 + parentPort!.postMessage({ callId, value: serializeArg(value) }); 20 22 } catch (err) { 21 23 const message = err instanceof Error ? err.message : String(err); 22 24 parentPort!.postMessage({ callId, error: message });