Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e46aca05dbf9a2204886d23f7ae3b34883f4e5c6 430 lines 16 kB view raw
1import type { Loadable } from './loadable.ts'; 2import type { Descriptor, BytesDescriptor, StringDescriptor } from './descriptors.ts'; 3import { int32, int64, bool } from './descriptors.ts'; 4import { Int32 } from './int32.ts'; 5import { Int64 } from './int64.ts'; 6import { Bool } from './bool.ts'; 7import { Bytes } from './bytes.ts'; 8import { SharedString } from './string.ts'; 9import { SharedStruct } from './shared-struct.ts'; 10import { Tuple } from './tuple.ts'; 11 12function isDescriptor(value: unknown): value is Descriptor<unknown> { 13 return typeof value === 'function' && 'byteSize' in value && '_class' in value; 14} 15 16function isBytesDescriptor(value: unknown): value is BytesDescriptor { 17 return typeof value === 'object' && value !== null && '_size' in value && '_class' in value; 18} 19 20function isStringDescriptor(value: unknown): value is StringDescriptor { 21 return typeof value === 'object' && value !== null && '_maxBytes' in value && '_class' in value; 22} 23 24function isStructSchema(value: unknown): value is Record<string, unknown> { 25 return typeof value === 'object' && value !== null && !Array.isArray(value); 26} 27 28function align(offset: number, alignment: number): number { 29 const remainder = offset % alignment; 30 return remainder === 0 ? offset : offset + (alignment - remainder); 31} 32 33function resolveValue(value: unknown): { descriptor: Descriptor<unknown>; initialValue: unknown } | null { 34 if (typeof value === 'number') { 35 if (!Number.isInteger(value) || value > 2_147_483_647 || value < -2_147_483_648) { 36 throw new RangeError(`Value ${value} out of range for int32. Use int64 with bigint instead.`); 37 } 38 return { descriptor: int32 as unknown as Descriptor<unknown>, initialValue: value }; 39 } 40 if (typeof value === 'bigint') { 41 return { descriptor: int64 as unknown as Descriptor<unknown>, initialValue: value }; 42 } 43 if (typeof value === 'boolean') { 44 return { descriptor: bool as unknown as Descriptor<unknown>, initialValue: value }; 45 } 46 return null; 47} 48 49interface LeafEntry { 50 path: string[]; 51 descriptor: Descriptor<unknown>; 52 offset: number; 53 initialValue?: unknown; 54 bytesSize?: number; 55 stringMaxBytes?: number; 56} 57 58interface ArrayLeafEntry { 59 type: 'leaf'; 60 descriptor: Descriptor<unknown>; 61 offset: number; 62 initialValue?: unknown; 63 bytesSize?: number; 64 stringMaxBytes?: number; 65} 66 67interface ArrayStructEntry { 68 type: 'struct'; 69 schema: Record<string, unknown>; 70 leaves: LeafEntry[]; 71} 72 73interface ArrayTupleEntry { 74 type: 'tuple'; 75 entries: ArrayEntry[]; 76} 77 78type ArrayEntry = ArrayLeafEntry | ArrayStructEntry | ArrayTupleEntry; 79 80function collectLeaves( 81 schema: Record<string, unknown>, 82 path: string[], 83 leaves: LeafEntry[], 84 cursor: { offset: number }, 85): void { 86 for (const key in schema) { 87 const value = schema[key]; 88 if (isStringDescriptor(value)) { 89 cursor.offset = align(cursor.offset, value.byteAlignment); 90 leaves.push({ 91 path: [...path, key], 92 descriptor: value as unknown as Descriptor<unknown>, 93 offset: cursor.offset, 94 stringMaxBytes: value._maxBytes, 95 }); 96 cursor.offset += value.byteSize; 97 } else if (isBytesDescriptor(value)) { 98 cursor.offset = align(cursor.offset, value.byteAlignment); 99 leaves.push({ 100 path: [...path, key], 101 descriptor: value as unknown as Descriptor<unknown>, 102 offset: cursor.offset, 103 bytesSize: value._size, 104 }); 105 cursor.offset += value.byteSize; 106 } else if (isDescriptor(value)) { 107 cursor.offset = align(cursor.offset, value.byteAlignment); 108 leaves.push({ path: [...path, key], descriptor: value, offset: cursor.offset }); 109 cursor.offset += value.byteSize; 110 } else if (Array.isArray(value)) { 111 collectArrayLeaves(value, leaves, cursor); 112 } else if (isStructSchema(value)) { 113 collectLeaves(value as Record<string, unknown>, [...path, key], leaves, cursor); 114 } else { 115 const resolved = resolveValue(value); 116 if (resolved !== null) { 117 const { descriptor, initialValue } = resolved; 118 cursor.offset = align(cursor.offset, descriptor.byteAlignment); 119 leaves.push({ path: [...path, key], descriptor, offset: cursor.offset, initialValue }); 120 cursor.offset += descriptor.byteSize; 121 } 122 } 123 } 124} 125 126function collectArrayLeaves(schema: unknown[], leaves: LeafEntry[], cursor: { offset: number }): void { 127 for (const element of schema) { 128 if (isStringDescriptor(element)) { 129 cursor.offset = align(cursor.offset, element.byteAlignment); 130 leaves.push({ 131 path: [], 132 descriptor: element as unknown as Descriptor<unknown>, 133 offset: cursor.offset, 134 stringMaxBytes: element._maxBytes, 135 }); 136 cursor.offset += element.byteSize; 137 } else if (isBytesDescriptor(element)) { 138 cursor.offset = align(cursor.offset, element.byteAlignment); 139 leaves.push({ 140 path: [], 141 descriptor: element as unknown as Descriptor<unknown>, 142 offset: cursor.offset, 143 bytesSize: element._size, 144 }); 145 cursor.offset += element.byteSize; 146 } else if (isDescriptor(element)) { 147 cursor.offset = align(cursor.offset, element.byteAlignment); 148 leaves.push({ path: [], descriptor: element, offset: cursor.offset }); 149 cursor.offset += element.byteSize; 150 } else if (Array.isArray(element)) { 151 collectArrayLeaves(element, leaves, cursor); 152 } else if (isStructSchema(element)) { 153 collectLeaves(element as Record<string, unknown>, [], leaves, cursor); 154 } else { 155 const resolved = resolveValue(element); 156 if (resolved !== null) { 157 const { descriptor, initialValue } = resolved; 158 cursor.offset = align(cursor.offset, descriptor.byteAlignment); 159 leaves.push({ path: [], descriptor, offset: cursor.offset, initialValue }); 160 cursor.offset += descriptor.byteSize; 161 } 162 } 163 } 164} 165 166function processArraySchema(schema: unknown[], cursor: { offset: number }): { entries: ArrayEntry[] } { 167 const entries: ArrayEntry[] = []; 168 for (const element of schema) { 169 if (isStringDescriptor(element)) { 170 cursor.offset = align(cursor.offset, element.byteAlignment); 171 entries.push({ 172 type: 'leaf', 173 descriptor: element as unknown as Descriptor<unknown>, 174 offset: cursor.offset, 175 stringMaxBytes: element._maxBytes, 176 }); 177 cursor.offset += element.byteSize; 178 } else if (isBytesDescriptor(element)) { 179 cursor.offset = align(cursor.offset, element.byteAlignment); 180 entries.push({ 181 type: 'leaf', 182 descriptor: element as unknown as Descriptor<unknown>, 183 offset: cursor.offset, 184 bytesSize: element._size, 185 }); 186 cursor.offset += element.byteSize; 187 } else if (isDescriptor(element)) { 188 cursor.offset = align(cursor.offset, element.byteAlignment); 189 entries.push({ type: 'leaf', descriptor: element, offset: cursor.offset }); 190 cursor.offset += element.byteSize; 191 } else if (Array.isArray(element)) { 192 const nested = processArraySchema(element, cursor); 193 entries.push({ type: 'tuple', entries: nested.entries }); 194 } else if (isStructSchema(element)) { 195 const leaves: LeafEntry[] = []; 196 collectLeaves(element as Record<string, unknown>, [], leaves, cursor); 197 entries.push({ type: 'struct', schema: element as Record<string, unknown>, leaves }); 198 } else { 199 const resolved = resolveValue(element); 200 if (resolved !== null) { 201 const { descriptor, initialValue } = resolved; 202 cursor.offset = align(cursor.offset, descriptor.byteAlignment); 203 entries.push({ type: 'leaf', descriptor, offset: cursor.offset, initialValue }); 204 cursor.offset += descriptor.byteSize; 205 } 206 } 207 } 208 return { entries }; 209} 210 211function buildTupleFromEntries(entries: ArrayEntry[], buffer: SharedArrayBuffer): Tuple<any> { 212 const elements = entries.map((entry) => { 213 if (entry.type === 'leaf') { 214 if (entry.stringMaxBytes !== undefined) { 215 return new SharedString(entry.stringMaxBytes, buffer, entry.offset); 216 } 217 if (entry.bytesSize !== undefined) { 218 return new Bytes(entry.bytesSize, buffer, entry.offset); 219 } 220 const instance = new entry.descriptor._class(buffer, entry.offset); 221 if ('initialValue' in entry) { 222 (instance as any).store(entry.initialValue); 223 } 224 return instance; 225 } 226 if (entry.type === 'struct') { 227 const leafIndex = { i: 0 }; 228 return buildStructTree(entry.schema, entry.leaves, buffer, leafIndex); 229 } 230 if (entry.type === 'tuple') { 231 return buildTupleFromEntries(entry.entries, buffer); 232 } 233 }); 234 return new Tuple(elements as any); 235} 236 237function buildStructTree( 238 schema: Record<string, unknown>, 239 leaves: LeafEntry[], 240 buffer: SharedArrayBuffer, 241 leafIndex: { i: number }, 242): SharedStruct<any> { 243 const fields: Record<string, any> = {}; 244 for (const key in schema) { 245 const value = schema[key]; 246 if (isStringDescriptor(value)) { 247 const leaf = leaves[leafIndex.i++]; 248 fields[key] = new SharedString(leaf.stringMaxBytes!, buffer, leaf.offset); 249 } else if (isBytesDescriptor(value)) { 250 const leaf = leaves[leafIndex.i++]; 251 fields[key] = new Bytes(leaf.bytesSize!, buffer, leaf.offset); 252 } else if (isDescriptor(value)) { 253 const leaf = leaves[leafIndex.i++]; 254 fields[key] = new leaf.descriptor._class(buffer, leaf.offset); 255 } else if (Array.isArray(value)) { 256 const arrayEntries = processArraySchemaFromLeaves(value, leaves, leafIndex); 257 fields[key] = buildTupleFromEntries(arrayEntries, buffer); 258 } else if (isStructSchema(value)) { 259 fields[key] = buildStructTree(value as Record<string, unknown>, leaves, buffer, leafIndex); 260 } else if (resolveValue(value) !== null) { 261 const leaf = leaves[leafIndex.i++]; 262 const instance = new leaf.descriptor._class(buffer, leaf.offset) as any; 263 if ('initialValue' in leaf) { 264 instance.store(leaf.initialValue); 265 } 266 fields[key] = instance; 267 } 268 } 269 return new SharedStruct(fields); 270} 271 272function processArraySchemaFromLeaves(schema: unknown[], leaves: LeafEntry[], leafIndex: { i: number }): ArrayEntry[] { 273 const entries: ArrayEntry[] = []; 274 for (const element of schema) { 275 if (isStringDescriptor(element)) { 276 const leaf = leaves[leafIndex.i++]; 277 entries.push({ 278 type: 'leaf', 279 descriptor: leaf.descriptor, 280 offset: leaf.offset, 281 stringMaxBytes: leaf.stringMaxBytes, 282 }); 283 } else if (isBytesDescriptor(element)) { 284 const leaf = leaves[leafIndex.i++]; 285 entries.push({ type: 'leaf', descriptor: leaf.descriptor, offset: leaf.offset, bytesSize: leaf.bytesSize }); 286 } else if (isDescriptor(element)) { 287 const leaf = leaves[leafIndex.i++]; 288 entries.push({ type: 'leaf', descriptor: leaf.descriptor, offset: leaf.offset }); 289 } else if (Array.isArray(element)) { 290 const nested = processArraySchemaFromLeaves(element, leaves, leafIndex); 291 entries.push({ type: 'tuple', entries: nested }); 292 } else if (isStructSchema(element)) { 293 const startIndex = leafIndex.i; 294 // Count leaves for this struct 295 countStructLeaves(element as Record<string, unknown>, leaves, leafIndex); 296 const endIndex = leafIndex.i; 297 const subLeaves = leaves.slice(startIndex, endIndex); 298 entries.push({ 299 type: 'struct', 300 schema: element as Record<string, unknown>, 301 leaves: subLeaves, 302 }); 303 } else if (resolveValue(element) !== null) { 304 const leaf = leaves[leafIndex.i++]; 305 entries.push({ type: 'leaf', descriptor: leaf.descriptor, offset: leaf.offset, initialValue: leaf.initialValue }); 306 } 307 } 308 return entries; 309} 310 311function countStructLeaves(schema: Record<string, unknown>, leaves: LeafEntry[], leafIndex: { i: number }): void { 312 for (const key in schema) { 313 const value = schema[key]; 314 if (isStringDescriptor(value) || isBytesDescriptor(value) || isDescriptor(value)) { 315 leafIndex.i++; 316 } else if (Array.isArray(value)) { 317 countArrayLeaves(value, leaves, leafIndex); 318 } else if (isStructSchema(value)) { 319 countStructLeaves(value as Record<string, unknown>, leaves, leafIndex); 320 } else if (resolveValue(value) !== null) { 321 leafIndex.i++; 322 } 323 } 324} 325 326function countArrayLeaves(schema: unknown[], leaves: LeafEntry[], leafIndex: { i: number }): void { 327 for (const element of schema) { 328 if (isStringDescriptor(element) || isBytesDescriptor(element) || isDescriptor(element)) { 329 leafIndex.i++; 330 } else if (Array.isArray(element)) { 331 countArrayLeaves(element, leaves, leafIndex); 332 } else if (isStructSchema(element)) { 333 countStructLeaves(element as Record<string, unknown>, leaves, leafIndex); 334 } else if (resolveValue(element) !== null) { 335 leafIndex.i++; 336 } 337 } 338} 339 340// Maps a schema value to its resolved instance type 341type ResolveField<T> = 342 T extends Descriptor<infer R> 343 ? R 344 : T extends BytesDescriptor 345 ? Bytes 346 : T extends StringDescriptor 347 ? SharedString 348 : T extends readonly unknown[] 349 ? ResolveTuple<T> 350 : T extends Record<string, unknown> 351 ? ResolveStruct<T> 352 : T extends number 353 ? Int32 354 : T extends bigint 355 ? Int64 356 : T extends boolean 357 ? Bool 358 : never; 359 360type ResolveStruct<T extends Record<string, unknown>> = SharedStruct<{ [K in keyof T]: ResolveField<T[K]> }>; 361type ResolveTuple<T extends readonly unknown[]> = Tuple<ResolveTupleElements<T>>; 362type ResolveTupleElements<T extends readonly unknown[]> = { [K in keyof T]: ResolveField<T[K]> } & Loadable<any>[]; 363 364/** 365 * Allocates shared memory from a schema. Accepts descriptors, plain objects (structs), 366 * arrays (tuples), or primitive values (shorthand). Compound schemas pack all fields 367 * into a single SharedArrayBuffer. 368 * @param schema - A descriptor, plain object, array, or primitive value defining the shape. 369 * @returns A {@link Loadable} instance (or struct/tuple/lock) backed by shared memory. 370 */ 371export function shared<T extends Descriptor<any>>(schema: T): ReturnType<T>; 372export function shared(schema: BytesDescriptor): Bytes; 373export function shared(schema: StringDescriptor): SharedString; 374export function shared(schema: number): Int32; 375export function shared(schema: bigint): Int64; 376export function shared(schema: boolean): Bool; 377export function shared<T extends readonly unknown[]>(schema: [...T]): ResolveTuple<T>; 378export function shared<T extends Record<string, unknown>>(schema: T): ResolveStruct<T>; 379export function shared(schema: unknown): any { 380 if (typeof schema === 'number') { 381 if (!Number.isInteger(schema) || schema > 2_147_483_647 || schema < -2_147_483_648) { 382 throw new RangeError(`Value ${schema} out of range for int32. Use int64 with bigint instead.`); 383 } 384 const instance = new Int32(new SharedArrayBuffer(Int32.byteSize), 0); 385 instance.store(schema); 386 return instance; 387 } 388 389 if (typeof schema === 'bigint') { 390 const instance = new Int64(new SharedArrayBuffer(Int64.byteSize), 0); 391 instance.store(schema); 392 return instance; 393 } 394 395 if (typeof schema === 'boolean') { 396 const instance = new Bool(new SharedArrayBuffer(Bool.byteSize), 0); 397 instance.store(schema); 398 return instance; 399 } 400 401 if (isStringDescriptor(schema)) { 402 return schema; // already a standalone instance 403 } 404 405 if (isBytesDescriptor(schema)) { 406 return schema; // already a standalone instance 407 } 408 409 if (isDescriptor(schema)) { 410 return schema(); 411 } 412 413 if (Array.isArray(schema)) { 414 const cursor = { offset: 0 }; 415 const { entries } = processArraySchema(schema, cursor); 416 const buffer = new SharedArrayBuffer(cursor.offset); 417 return buildTupleFromEntries(entries, buffer); 418 } 419 420 if (isStructSchema(schema)) { 421 const leaves: LeafEntry[] = []; 422 const cursor = { offset: 0 }; 423 collectLeaves(schema as Record<string, unknown>, [], leaves, cursor); 424 const buffer = new SharedArrayBuffer(cursor.offset); 425 const leafIndex = { i: 0 }; 426 return buildStructTree(schema as Record<string, unknown>, leaves, buffer, leafIndex); 427 } 428 429 throw new Error('Invalid schema'); 430}