Offload functions to worker threads with shared memory primitives for Node.js.
1import { registerSync } from './reconstruct.ts';
2
3const UNLOCKED = 0;
4const LOCKED = 1;
5
6/** Disposes by calling `unlock()` on the associated mutex. */
7export class MutexGuard {
8 private readonly mutex: Mutex;
9
10 constructor(mutex: Mutex) {
11 this.mutex = mutex;
12 }
13
14 [Symbol.dispose](): void {
15 this.mutex.unlock();
16 }
17}
18
19/** An async mutual exclusion lock backed by SharedArrayBuffer. Works across threads. */
20export class Mutex {
21 static readonly byteSize = 4;
22 static readonly byteAlignment = 4;
23 private readonly view: Int32Array;
24
25 constructor(buffer?: SharedArrayBuffer, byteOffset?: number) {
26 const buf = buffer ?? new SharedArrayBuffer(4);
27 const offset = byteOffset ?? 0;
28 this.view = new Int32Array(buf, offset, 1);
29 }
30
31 /**
32 * Acquires the lock, waiting asynchronously if held by another thread.
33 * @returns A disposable {@link MutexGuard} that releases the lock on dispose.
34 */
35 async lock(): Promise<MutexGuard> {
36 while (true) {
37 // Try to acquire: CAS from UNLOCKED to LOCKED
38 if (Atomics.compareExchange(this.view, 0, UNLOCKED, LOCKED) === UNLOCKED) {
39 return new MutexGuard(this);
40 }
41 // Wait until notified that the lock might be free
42 const result = Atomics.waitAsync(this.view, 0, LOCKED);
43 if (result.async) {
44 await result.value;
45 }
46 }
47 }
48
49 /** Releases the lock and wakes one waiting thread. */
50 unlock(): void {
51 Atomics.store(this.view, 0, UNLOCKED);
52 Atomics.notify(this.view, 0, 1);
53 }
54
55 [Symbol.for('moroutine.shared')](): { tag: string; buffer: SharedArrayBuffer; byteOffset: number } {
56 return { tag: 'Mutex', buffer: this.view.buffer as SharedArrayBuffer, byteOffset: this.view.byteOffset };
57 }
58}
59
60registerSync('Mutex', Mutex);