Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

at lex 210 lines 5.4 kB view raw
1import { bailableWait } from "./util.ts"; 2 3// reads values from a generator into a list 4// breaks when isDone signals `true` AND `waitFor` completes OR when a max length is reached 5// NOTE: does not signal generator to close. it *will* continue to produce values 6export const readFromGenerator = async <T>( 7 gen: AsyncGenerator<T>, 8 isDone: (last?: T) => Promise<boolean> | boolean, 9 waitFor: Promise<unknown> = Promise.resolve(), 10 maxLength = Number.MAX_SAFE_INTEGER, 11): Promise<T[]> => { 12 const evts: T[] = []; 13 let bail: undefined | (() => void); 14 let hasBroke = false; 15 const awaitDone = async () => { 16 if (await isDone(evts.at(-1))) { 17 return true; 18 } 19 const bailable = bailableWait(20); 20 await bailable.wait(); 21 bail = bailable.bail; 22 if (hasBroke) return false; 23 return await awaitDone(); 24 }; 25 const breakOn: Promise<void> = new Promise((resolve) => { 26 waitFor.then(() => { 27 awaitDone().then(() => resolve()); 28 }); 29 }); 30 31 try { 32 while (evts.length < maxLength) { 33 const maybeEvt = await Promise.race([gen.next(), breakOn]); 34 if (!maybeEvt) break; 35 const evt = maybeEvt as IteratorResult<T>; 36 if (evt.done) break; 37 evts.push(evt.value); 38 } 39 } finally { 40 hasBroke = true; 41 bail && bail(); 42 } 43 return evts; 44}; 45 46export type Deferrable = { 47 resolve: () => void; 48 complete: Promise<void>; 49}; 50 51export const createDeferrable = (): Deferrable => { 52 let resolve!: () => void; 53 const promise: Promise<void> = new Promise((res) => { 54 resolve = () => res(); 55 }); 56 return { resolve, complete: promise }; 57}; 58 59export const createDeferrables = (count: number): Deferrable[] => { 60 const list: Deferrable[] = []; 61 for (let i = 0; i < count; i++) { 62 list.push(createDeferrable()); 63 } 64 return list; 65}; 66 67export const allComplete = async (deferrables: Deferrable[]): Promise<void> => { 68 await Promise.all(deferrables.map((d) => d.complete)); 69}; 70 71export class AsyncBuffer<T> { 72 private buffer: T[] = []; 73 private promise: Promise<void>; 74 private resolve: () => void; 75 private closed = false; 76 private toThrow: unknown | undefined; 77 78 constructor(public maxSize?: number) { 79 // Initializing to satisfy types/build, immediately reset by resetPromise() 80 this.promise = Promise.resolve(); 81 this.resolve = () => null; 82 this.resetPromise(); 83 } 84 85 get curr(): T[] { 86 return this.buffer; 87 } 88 89 get size(): number { 90 return this.buffer.length; 91 } 92 93 get isClosed(): boolean { 94 return this.closed; 95 } 96 97 resetPromise() { 98 this.promise = new Promise<void>((r) => (this.resolve = r)); 99 } 100 101 push(item: T) { 102 this.buffer.push(item); 103 this.resolve(); 104 } 105 106 pushMany(items: T[]) { 107 items.forEach((i) => this.buffer.push(i)); 108 this.resolve(); 109 } 110 111 async *events(): AsyncGenerator<T> { 112 while (true) { 113 if (this.closed && this.buffer.length === 0) { 114 if (this.toThrow) { 115 throw this.toThrow; 116 } else { 117 return; 118 } 119 } 120 await this.promise; 121 if (this.toThrow) { 122 throw this.toThrow; 123 } 124 if (this.maxSize && this.size > this.maxSize) { 125 throw new AsyncBufferFullError(this.maxSize); 126 } 127 const [first, ...rest] = this.buffer; 128 if (first) { 129 this.buffer = rest; 130 yield first; 131 } else { 132 this.resetPromise(); 133 } 134 } 135 } 136 137 throw(err: unknown) { 138 this.toThrow = err; 139 this.closed = true; 140 this.resolve(); 141 } 142 143 close() { 144 this.closed = true; 145 this.resolve(); 146 } 147} 148 149export class AsyncBufferFullError extends Error { 150 constructor(maxSize: number) { 151 super(`ReachedMaxBufferSize: ${maxSize}`); 152 } 153} 154 155/** 156 * Utility function that behaves like {@link Promise.allSettled} but returns the 157 * same result as {@link Promise.all} in case every promise is fulfilled, and 158 * throws an {@link AggregateError} if there are more than one errors. 159 */ 160export function allFulfilled<T extends readonly unknown[] | []>( 161 promises: T, 162): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }>; 163export function allFulfilled<T>( 164 promises: Iterable<T | PromiseLike<T>>, 165): Promise<Awaited<T>[]>; 166export function allFulfilled( 167 promises: Iterable<Promise<unknown>>, 168): Promise<unknown[]> { 169 return Promise.allSettled(promises).then(handleAllSettledErrors); 170} 171 172export function handleAllSettledErrors< 173 T extends readonly PromiseSettledResult<unknown>[] | [], 174>( 175 results: T, 176): { 177 -readonly [P in keyof T]: T[P] extends PromiseSettledResult<infer U> ? U 178 : never; 179}; 180export function handleAllSettledErrors<T>( 181 results: PromiseSettledResult<T>[], 182): T[]; 183export function handleAllSettledErrors( 184 results: PromiseSettledResult<unknown>[], 185): unknown[] { 186 if (results.every(isFulfilledResult)) return results.map(extractValue); 187 188 const errors = results.filter(isRejectedResult).map(extractReason); 189 throw errors; 190} 191 192export function isRejectedResult( 193 result: PromiseSettledResult<unknown>, 194): result is PromiseRejectedResult { 195 return result.status === "rejected"; 196} 197 198function extractReason(result: PromiseRejectedResult): unknown { 199 return result.reason; 200} 201 202export function isFulfilledResult<T>( 203 result: PromiseSettledResult<T>, 204): result is PromiseFulfilledResult<T> { 205 return result.status === "fulfilled"; 206} 207 208function extractValue<T>(result: PromiseFulfilledResult<T>): T { 209 return result.value; 210}