Offload functions to worker threads with shared memory primitives for Node.js.
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: dispatchStream returns StreamDispatch with done promise

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+42 -26
+2 -1
src/dedicated-runner.ts
··· 22 22 23 23 export function runStreamOnDedicated<T>(id: string, args: unknown[]): AsyncIterable<T> { 24 24 const worker = getWorker(id); 25 - return dispatchStream<T>(worker, id, args); 25 + const { iterable } = dispatchStream<T>(worker, id, args); 26 + return iterable; 26 27 }
+38 -24
src/execute.ts
··· 143 143 }); 144 144 } 145 145 146 + export interface StreamDispatch<T> { 147 + iterable: AsyncIterable<T>; 148 + done: Promise<void>; 149 + } 150 + 146 151 export function dispatchStream<T>( 147 152 worker: Worker, 148 153 id: string, 149 154 args: unknown[], 150 155 opts?: ChannelOptions, 151 - ): AsyncIterable<T> { 156 + ): StreamDispatch<T> { 152 157 const url = id.slice(0, id.lastIndexOf('#')); 153 158 freezeModule(url); 154 159 const highWater = opts?.highWaterMark ?? DEFAULT_HIGH_WATER; ··· 164 169 worker.ref(); 165 170 worker.postMessage(msg, [...extracted.transfer, ...ports, port2] as any[]); 166 171 172 + let resolveDone: () => void; 173 + const donePromise = new Promise<void>((r) => { resolveDone = r; }); 174 + 167 175 const queue: T[] = []; 168 176 let done = false; 169 177 let error: Error | null = null; ··· 184 192 done = true; 185 193 port1.close(); 186 194 unrefWorkerOnce(); 195 + resolveDone!(); 187 196 if (waiting) { 188 197 waiting(); 189 198 waiting = null; ··· 194 203 done = true; 195 204 port1.close(); 196 205 unrefWorkerOnce(); 206 + resolveDone!(); 197 207 if (waiting) { 198 208 waiting(); 199 209 waiting = null; ··· 213 223 port1.unref(); 214 224 215 225 return { 216 - [Symbol.asyncIterator]() { 217 - return { 218 - async next(): Promise<IteratorResult<T>> { 219 - while (true) { 220 - if (queue.length > 0) { 221 - const value = queue.shift()!; 222 - if (paused && queue.length <= LOW_WATER) { 223 - paused = false; 224 - port1.postMessage('resume'); 226 + iterable: { 227 + [Symbol.asyncIterator]() { 228 + return { 229 + async next(): Promise<IteratorResult<T>> { 230 + while (true) { 231 + if (queue.length > 0) { 232 + const value = queue.shift()!; 233 + if (paused && queue.length <= LOW_WATER) { 234 + paused = false; 235 + port1.postMessage('resume'); 236 + } 237 + return { done: false, value }; 225 238 } 226 - return { done: false, value }; 239 + if (error) throw error; 240 + if (done) return { done: true, value: undefined }; 241 + await new Promise<void>((resolve) => { 242 + waiting = resolve; 243 + }); 227 244 } 228 - if (error) throw error; 229 - if (done) return { done: true, value: undefined }; 230 - await new Promise<void>((resolve) => { 231 - waiting = resolve; 232 - }); 233 - } 234 - }, 235 - async return(): Promise<IteratorResult<T>> { 236 - port1.close(); 237 - unrefWorkerOnce(); 238 - return { done: true, value: undefined }; 239 - }, 240 - }; 245 + }, 246 + async return(): Promise<IteratorResult<T>> { 247 + port1.close(); 248 + unrefWorkerOnce(); 249 + resolveDone!(); 250 + return { done: true, value: undefined }; 251 + }, 252 + }; 253 + }, 241 254 }, 255 + done: donePromise, 242 256 }; 243 257 }
+2 -1
src/worker-pool.ts
··· 40 40 if (disposed) throw new Error('Worker pool is disposed'); 41 41 const worker = pool[next % pool.length]; 42 42 next++; 43 - return dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, opts); 43 + const { iterable } = dispatchStream(worker, taskOrTasks.id, taskOrTasks.args, opts); 44 + return iterable; 44 45 } 45 46 if (Array.isArray(taskOrTasks)) { 46 47 return Promise.all(taskOrTasks.map((t) => dispatch(t)));