Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at e2cebd539403475e2f4e79d55ca1a84a0ce3510d 181 lines 5.3 kB view raw
1import { parentPort, MessagePort } from 'node:worker_threads'; 2import type { Transferable } from 'node:worker_threads'; 3import { registry } from './registry.ts'; 4import { deserializeArg, serializeArg } from './shared/reconstruct.ts'; 5import { collectTransferables } from './transfer.ts'; 6 7const imported = new Set<string>(); 8const taskCache = new Map<number, unknown>(); 9 10function isTaskArg(arg: unknown): arg is { __task__: number; id: string; args: unknown[] } { 11 return typeof arg === 'object' && arg !== null && '__task__' in arg; 12} 13 14function portToAsyncIterable<T>(port: MessagePort): AsyncIterable<T> { 15 const queue: T[] = []; 16 let done = false; 17 let error: Error | null = null; 18 let paused = false; 19 let waiting: (() => void) | null = null; 20 21 const HIGH_WATER = 16; 22 23 port.on('message', (msg: { value?: unknown; done?: boolean; error?: Error }) => { 24 if (msg.error) { 25 error = msg.error; 26 done = true; 27 if (waiting) { 28 waiting(); 29 waiting = null; 30 } 31 return; 32 } 33 if (msg.done) { 34 done = true; 35 if (waiting) { 36 waiting(); 37 waiting = null; 38 } 39 return; 40 } 41 queue.push(deserializeArg(msg.value) as T); 42 if (waiting) { 43 waiting(); 44 waiting = null; 45 } 46 if (!paused && queue.length >= HIGH_WATER) { 47 paused = true; 48 port.postMessage('pause'); 49 } 50 }); 51 52 return { 53 [Symbol.asyncIterator]() { 54 return { 55 async next(): Promise<IteratorResult<T>> { 56 while (true) { 57 if (queue.length > 0) { 58 const value = queue.shift()!; 59 if (paused && queue.length <= 1) { 60 paused = false; 61 port.postMessage('resume'); 62 } 63 return { done: false, value }; 64 } 65 if (error) throw error; 66 if (done) return { done: true, value: undefined }; 67 await new Promise<void>((resolve) => { 68 waiting = resolve; 69 }); 70 } 71 }, 72 async return(): Promise<IteratorResult<T>> { 73 port.close(); 74 return { done: true, value: undefined }; 75 }, 76 }; 77 }, 78 }; 79} 80 81async function resolveArg(arg: unknown): Promise<unknown> { 82 if (arg instanceof MessagePort) { 83 return portToAsyncIterable(arg); 84 } 85 if (isTaskArg(arg)) { 86 if (taskCache.has(arg.__task__)) { 87 return taskCache.get(arg.__task__); 88 } 89 // Resolve the task's own args recursively 90 const resolvedArgs = await Promise.all(arg.args.map(resolveArg)); 91 // Import the module and run the function 92 const url = arg.id.slice(0, arg.id.lastIndexOf('#')); 93 if (!imported.has(url)) { 94 await import(url); 95 imported.add(url); 96 } 97 const fn = registry.get(arg.id); 98 if (!fn) throw new Error(`Moroutine not found: ${arg.id}`); 99 const value = await fn(...resolvedArgs); 100 taskCache.set(arg.__task__, value); 101 return value; 102 } 103 return deserializeArg(arg); 104} 105 106parentPort!.on('message', async (msg: { callId?: number; id: string; args: unknown[]; port?: MessagePort }) => { 107 const { id, args, port } = msg; 108 try { 109 const url = id.slice(0, id.lastIndexOf('#')); 110 if (!imported.has(url)) { 111 await import(url); 112 imported.add(url); 113 } 114 115 const fn = registry.get(id); 116 if (!fn) throw new Error(`Moroutine not found: ${id}`); 117 118 const resolvedArgs = await Promise.all(args.map(resolveArg)); 119 120 if (port) { 121 let paused = false; 122 let resumed: (() => void) | null = null; 123 let cancelled = false; 124 125 port.on('message', (signal: string) => { 126 if (signal === 'pause') { 127 paused = true; 128 } else if (signal === 'resume') { 129 paused = false; 130 if (resumed) { 131 resumed(); 132 resumed = null; 133 } 134 } 135 }); 136 137 port.on('close', () => { 138 cancelled = true; 139 if (resumed) { 140 resumed(); 141 resumed = null; 142 } 143 }); 144 145 try { 146 const gen = fn(...resolvedArgs) as AsyncGenerator; 147 for await (const value of gen) { 148 if (cancelled) break; 149 if (paused) 150 await new Promise<void>((r) => { 151 resumed = r; 152 }); 153 if (cancelled) break; 154 const serialized = serializeArg(value); 155 const transferList: Transferable[] = []; 156 collectTransferables(value, transferList); 157 port.postMessage({ value: serialized, done: false }, transferList); 158 await new Promise((r) => setImmediate(r)); 159 } 160 if (!cancelled) port.postMessage({ done: true }); 161 } catch (err) { 162 if (!cancelled) { 163 port.postMessage({ done: true, error: err instanceof Error ? err : new Error(String(err)) }); 164 } 165 } 166 try { 167 port.close(); 168 } catch {} 169 return; 170 } 171 172 const callId = msg.callId!; 173 const value = await fn(...resolvedArgs); 174 const returnValue = serializeArg(value); 175 const transferList: Transferable[] = []; 176 collectTransferables(value, transferList); 177 parentPort!.postMessage({ callId, value: returnValue }, transferList); 178 } catch (err) { 179 parentPort!.postMessage({ callId: msg.callId!, error: err instanceof Error ? err : new Error(String(err)) }); 180 } 181});