Offload functions to worker threads with shared memory primitives for Node.js.
1import { describe, it } from 'node:test';
2import assert from 'node:assert/strict';
3import { workers, int32atomic, mutex, rwlock, shared, int32, string, bytes } from 'moroutine';
4import { atomicAdd, mutexIncrement, rwlockRead } from '../fixtures/sync.ts';
5import { readValue, writeValue } from '../fixtures/shared-new.ts';
6
7describe('shared primitives across workers', () => {
8 it('Int32Atomic is shared across worker threads', async () => {
9 const counter = int32atomic();
10 const run = workers(2);
11 try {
12 await run([atomicAdd(counter, 10), atomicAdd(counter, 20)]);
13 assert.equal(counter.load(), 30);
14 } finally {
15 run[Symbol.dispose]();
16 }
17 });
18
19 it('Mutex serializes access across workers', async () => {
20 const m = mutex();
21 const counter = int32atomic();
22 const run = workers(2);
23 try {
24 await run([mutexIncrement(m, counter, 100), mutexIncrement(m, counter, 100)]);
25 assert.equal(counter.load(), 200);
26 } finally {
27 run[Symbol.dispose]();
28 }
29 });
30
31 it('RwLock allows concurrent reads from workers', async () => {
32 const rw = rwlock();
33 const counter = int32atomic();
34 counter.store(42);
35 const run = workers(2);
36 try {
37 const [a, b] = await run([rwlockRead(rw, counter), rwlockRead(rw, counter)]);
38 assert.equal(a, 42);
39 assert.equal(b, 42);
40 } finally {
41 run[Symbol.dispose]();
42 }
43 });
44
45 it('struct via shared() works across workers', async () => {
46 const point = shared({ x: int32, y: int32 });
47 point.store({ x: 1, y: 2 });
48
49 const run = workers(1);
50 try {
51 const result = await run(readValue(point));
52 assert.deepEqual(result, { x: 1, y: 2 });
53 } finally {
54 run[Symbol.dispose]();
55 }
56 });
57
58 it('worker can write to struct via shared()', async () => {
59 const point = shared({ x: int32, y: int32 });
60
61 const run = workers(1);
62 try {
63 await run(writeValue(point, { x: 10, y: 20 }));
64 assert.deepEqual(point.load(), { x: 10, y: 20 });
65 } finally {
66 run[Symbol.dispose]();
67 }
68 });
69
70 it('tuple via shared() works across workers', async () => {
71 const t = shared([int32, int32]);
72 t.store([1, 2]);
73
74 const run = workers(1);
75 try {
76 const result = await run(readValue(t));
77 assert.deepEqual(result, [1, 2]);
78 } finally {
79 run[Symbol.dispose]();
80 }
81 });
82
83 it('string via shared() works across workers', async () => {
84 const s = shared(string(32));
85 s.store('hello');
86
87 const run = workers(1);
88 try {
89 const result = await run(readValue(s));
90 assert.equal(result, 'hello');
91 } finally {
92 run[Symbol.dispose]();
93 }
94 });
95
96 it('bytes via shared() works across workers', async () => {
97 const b = shared(bytes(4));
98 b.store(new Uint8Array([1, 2, 3, 4]));
99
100 const run = workers(1);
101 try {
102 const result = (await run(readValue(b))) as Uint8Array;
103 assert.deepEqual([...result], [1, 2, 3, 4]);
104 } finally {
105 run[Symbol.dispose]();
106 }
107 });
108
109 it('nested struct with string across workers', async () => {
110 const entity = shared({ name: string(16), hp: int32 });
111 entity.store({ name: 'goblin', hp: 50 });
112
113 const run = workers(1);
114 try {
115 const result = await run(readValue(entity));
116 assert.deepEqual(result, { name: 'goblin', hp: 50 });
117 } finally {
118 run[Symbol.dispose]();
119 }
120 });
121});