import { bailableWait } from "./util.ts"; // reads values from a generator into a list // breaks when isDone signals `true` AND `waitFor` completes OR when a max length is reached // NOTE: does not signal generator to close. it *will* continue to produce values export const readFromGenerator = async ( gen: AsyncGenerator, isDone: (last?: T) => Promise | boolean, waitFor: Promise = Promise.resolve(), maxLength = Number.MAX_SAFE_INTEGER, ): Promise => { const evts: T[] = []; let bail: undefined | (() => void); let hasBroke = false; const awaitDone = async () => { if (await isDone(evts.at(-1))) { return true; } const bailable = bailableWait(20); await bailable.wait(); bail = bailable.bail; if (hasBroke) return false; return await awaitDone(); }; const breakOn: Promise = new Promise((resolve) => { waitFor.then(() => { awaitDone().then(() => resolve()); }); }); try { while (evts.length < maxLength) { const maybeEvt = await Promise.race([gen.next(), breakOn]); if (!maybeEvt) break; const evt = maybeEvt as IteratorResult; if (evt.done) break; evts.push(evt.value); } } finally { hasBroke = true; bail && bail(); } return evts; }; export type Deferrable = { resolve: () => void; complete: Promise; }; export const createDeferrable = (): Deferrable => { let resolve!: () => void; const promise: Promise = new Promise((res) => { resolve = () => res(); }); return { resolve, complete: promise }; }; export const createDeferrables = (count: number): Deferrable[] => { const list: Deferrable[] = []; for (let i = 0; i < count; i++) { list.push(createDeferrable()); } return list; }; export const allComplete = async (deferrables: Deferrable[]): Promise => { await Promise.all(deferrables.map((d) => d.complete)); }; export class AsyncBuffer { private buffer: T[] = []; private promise: Promise; private resolve: () => void; private closed = false; private toThrow: unknown | undefined; constructor(public maxSize?: number) { // Initializing to satisfy types/build, immediately reset by resetPromise() this.promise = Promise.resolve(); this.resolve = () => null; this.resetPromise(); } get curr(): T[] { return this.buffer; } get size(): number { return this.buffer.length; } get isClosed(): boolean { return this.closed; } resetPromise() { this.promise = new Promise((r) => (this.resolve = r)); } push(item: T) { this.buffer.push(item); this.resolve(); } pushMany(items: T[]) { items.forEach((i) => this.buffer.push(i)); this.resolve(); } async *events(): AsyncGenerator { while (true) { if (this.closed && this.buffer.length === 0) { if (this.toThrow) { throw this.toThrow; } else { return; } } await this.promise; if (this.toThrow) { throw this.toThrow; } if (this.maxSize && this.size > this.maxSize) { throw new AsyncBufferFullError(this.maxSize); } const [first, ...rest] = this.buffer; if (first) { this.buffer = rest; yield first; } else { this.resetPromise(); } } } throw(err: unknown) { this.toThrow = err; this.closed = true; this.resolve(); } close() { this.closed = true; this.resolve(); } } export class AsyncBufferFullError extends Error { constructor(maxSize: number) { super(`ReachedMaxBufferSize: ${maxSize}`); } } /** * Utility function that behaves like {@link Promise.allSettled} but returns the * same result as {@link Promise.all} in case every promise is fulfilled, and * throws an {@link AggregateError} if there are more than one errors. */ export function allFulfilled( promises: T, ): Promise<{ -readonly [P in keyof T]: Awaited }>; export function allFulfilled( promises: Iterable>, ): Promise[]>; export function allFulfilled( promises: Iterable>, ): Promise { return Promise.allSettled(promises).then(handleAllSettledErrors); } export function handleAllSettledErrors< T extends readonly PromiseSettledResult[] | [], >( results: T, ): { -readonly [P in keyof T]: T[P] extends PromiseSettledResult ? U : never; }; export function handleAllSettledErrors( results: PromiseSettledResult[], ): T[]; export function handleAllSettledErrors( results: PromiseSettledResult[], ): unknown[] { if (results.every(isFulfilledResult)) return results.map(extractValue); const errors = results.filter(isRejectedResult).map(extractReason); throw errors; } export function isRejectedResult( result: PromiseSettledResult, ): result is PromiseRejectedResult { return result.status === "rejected"; } function extractReason(result: PromiseRejectedResult): unknown { return result.reason; } export function isFulfilledResult( result: PromiseSettledResult, ): result is PromiseFulfilledResult { return result.status === "fulfilled"; } function extractValue(result: PromiseFulfilledResult): T { return result.value; }