Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: RwLock with async locking and disposable guards

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Devin Ivy 115e2fb7 d65d389c

+216
+3
src/index.ts
··· 14 14 AtomicUint64, 15 15 Mutex, 16 16 MutexGuard, 17 + RwLock, 18 + ReadGuard, 19 + WriteGuard, 17 20 } from './sync/index.ts';
+1
src/sync/index.ts
··· 8 8 export { AtomicInt64 } from './atomic-int64.ts'; 9 9 export { AtomicUint64 } from './atomic-uint64.ts'; 10 10 export { Mutex, MutexGuard } from './mutex.ts'; 11 + export { RwLock, ReadGuard, WriteGuard } from './rwlock.ts';
+86
src/sync/rwlock.ts
··· 1 + const UNLOCKED = 0; 2 + const WRITE_LOCKED = -1; 3 + 4 + export class ReadGuard { 5 + private readonly rwlock: RwLock; 6 + 7 + constructor(rwlock: RwLock) { 8 + this.rwlock = rwlock; 9 + } 10 + 11 + [Symbol.dispose](): void { 12 + this.rwlock.readUnlock(); 13 + } 14 + } 15 + 16 + export class WriteGuard { 17 + private readonly rwlock: RwLock; 18 + 19 + constructor(rwlock: RwLock) { 20 + this.rwlock = rwlock; 21 + } 22 + 23 + [Symbol.dispose](): void { 24 + this.rwlock.writeUnlock(); 25 + } 26 + } 27 + 28 + export class RwLock { 29 + static readonly byteSize = 4; 30 + private readonly view: Int32Array; 31 + 32 + constructor(buffer?: SharedArrayBuffer, byteOffset?: number) { 33 + const buf = buffer ?? new SharedArrayBuffer(4); 34 + const offset = byteOffset ?? 0; 35 + this.view = new Int32Array(buf, offset, 1); 36 + } 37 + 38 + async readLock(): Promise<ReadGuard> { 39 + while (true) { 40 + const state = Atomics.load(this.view, 0); 41 + // Can acquire if not write-locked 42 + if (state >= UNLOCKED) { 43 + if (Atomics.compareExchange(this.view, 0, state, state + 1) === state) { 44 + return new ReadGuard(this); 45 + } 46 + continue; // CAS failed, retry immediately 47 + } 48 + // Write-locked, wait for change 49 + const result = Atomics.waitAsync(this.view, 0, state); 50 + if (result.async) { 51 + await result.value; 52 + } 53 + } 54 + } 55 + 56 + readUnlock(): void { 57 + const prev = Atomics.sub(this.view, 0, 1); 58 + if (prev === 1) { 59 + // Last reader, wake a waiting writer 60 + Atomics.notify(this.view, 0); 61 + } 62 + } 63 + 64 + async writeLock(): Promise<WriteGuard> { 65 + while (true) { 66 + // Can only acquire from unlocked state 67 + if (Atomics.compareExchange(this.view, 0, UNLOCKED, WRITE_LOCKED) === UNLOCKED) { 68 + return new WriteGuard(this); 69 + } 70 + // Someone holds the lock, wait for change 71 + const state = Atomics.load(this.view, 0); 72 + if (state !== UNLOCKED) { 73 + const result = Atomics.waitAsync(this.view, 0, state); 74 + if (result.async) { 75 + await result.value; 76 + } 77 + } 78 + } 79 + } 80 + 81 + writeUnlock(): void { 82 + Atomics.store(this.view, 0, UNLOCKED); 83 + // Wake all waiters — both readers and writers may be waiting 84 + Atomics.notify(this.view, 0, +Infinity); 85 + } 86 + }
+126
test/sync/rwlock.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { RwLock } from 'moroutine'; 4 + 5 + describe('RwLock', () => { 6 + it('read lock and unlock', async () => { 7 + const rw = new RwLock(); 8 + const guard = await rw.readLock(); 9 + assert.equal(typeof guard[Symbol.dispose], 'function'); 10 + rw.readUnlock(); 11 + }); 12 + 13 + it('write lock and unlock', async () => { 14 + const rw = new RwLock(); 15 + const guard = await rw.writeLock(); 16 + assert.equal(typeof guard[Symbol.dispose], 'function'); 17 + rw.writeUnlock(); 18 + }); 19 + 20 + it('multiple readers can hold the lock concurrently', async () => { 21 + const rw = new RwLock(); 22 + const g1 = await rw.readLock(); 23 + const g2 = await rw.readLock(); 24 + // Both acquired — success 25 + rw.readUnlock(); 26 + rw.readUnlock(); 27 + }); 28 + 29 + it('writer excludes readers', async () => { 30 + const rw = new RwLock(); 31 + const order: string[] = []; 32 + 33 + await rw.writeLock(); 34 + order.push('write-start'); 35 + 36 + const readerDone = (async () => { 37 + const g = await rw.readLock(); 38 + order.push('read'); 39 + rw.readUnlock(); 40 + })(); 41 + 42 + await new Promise((r) => setTimeout(r, 20)); 43 + order.push('write-end'); 44 + rw.writeUnlock(); 45 + 46 + await readerDone; 47 + assert.deepEqual(order, ['write-start', 'write-end', 'read']); 48 + }); 49 + 50 + it('writer excludes other writers', async () => { 51 + const rw = new RwLock(); 52 + const order: string[] = []; 53 + 54 + await rw.writeLock(); 55 + order.push('w1-start'); 56 + 57 + const writer2Done = (async () => { 58 + const g = await rw.writeLock(); 59 + order.push('w2'); 60 + rw.writeUnlock(); 61 + })(); 62 + 63 + await new Promise((r) => setTimeout(r, 20)); 64 + order.push('w1-end'); 65 + rw.writeUnlock(); 66 + 67 + await writer2Done; 68 + assert.deepEqual(order, ['w1-start', 'w1-end', 'w2']); 69 + }); 70 + 71 + it('ReadGuard dispose calls readUnlock', async () => { 72 + const rw = new RwLock(); 73 + const guard = await rw.readLock(); 74 + guard[Symbol.dispose](); 75 + const wg = await rw.writeLock(); 76 + rw.writeUnlock(); 77 + }); 78 + 79 + it('WriteGuard dispose calls writeUnlock', async () => { 80 + const rw = new RwLock(); 81 + const guard = await rw.writeLock(); 82 + guard[Symbol.dispose](); 83 + const rg = await rw.readLock(); 84 + rw.readUnlock(); 85 + }); 86 + 87 + it('self-allocates', () => { 88 + const rw = new RwLock(); 89 + assert.ok(rw); 90 + }); 91 + 92 + it('accepts an external buffer and byteOffset', () => { 93 + const buffer = new SharedArrayBuffer(16); 94 + const rw = new RwLock(buffer, 4); 95 + assert.ok(rw); 96 + }); 97 + 98 + it('exposes static byteSize of 4', () => { 99 + assert.equal(RwLock.byteSize, 4); 100 + }); 101 + 102 + it('wakes all waiting readers when writer unlocks', async () => { 103 + const rw = new RwLock(); 104 + 105 + await rw.writeLock(); 106 + 107 + const reader1Done = (async () => { 108 + const g = await rw.readLock(); 109 + rw.readUnlock(); 110 + return 'r1'; 111 + })(); 112 + 113 + const reader2Done = (async () => { 114 + const g = await rw.readLock(); 115 + rw.readUnlock(); 116 + return 'r2'; 117 + })(); 118 + 119 + // Let readers queue up 120 + await new Promise((r) => setTimeout(r, 20)); 121 + rw.writeUnlock(); 122 + 123 + const results = await Promise.all([reader1Done, reader2Done]); 124 + assert.deepEqual(results.sort(), ['r1', 'r2']); 125 + }); 126 + });