Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 201 lines 4.8 kB view raw
1import { bailableWait } from "./util.ts"; 2 3export const readFromGenerator = async <T>( 4 gen: AsyncGenerator<T>, 5 isDone: (last?: T) => Promise<boolean> | boolean, 6 waitFor: Promise<unknown> = Promise.resolve(), 7 maxLength = Number.MAX_SAFE_INTEGER, 8): Promise<T[]> => { 9 const evts: T[] = []; 10 let bail: undefined | (() => void); 11 let hasBroke = false; 12 const awaitDone = async () => { 13 if (await isDone(evts.at(-1))) { 14 return true; 15 } 16 const bailable = bailableWait(20); 17 await bailable.wait(); 18 bail = bailable.bail; 19 if (hasBroke) return false; 20 return await awaitDone(); 21 }; 22 const breakOn: Promise<void> = new Promise((resolve) => { 23 waitFor.then(() => { 24 awaitDone().then(() => resolve()); 25 }); 26 }); 27 28 try { 29 while (evts.length < maxLength) { 30 const maybeEvt = await Promise.race([gen.next(), breakOn]); 31 if (!maybeEvt) break; 32 const evt = maybeEvt as IteratorResult<T>; 33 if (evt.done) break; 34 evts.push(evt.value); 35 } 36 } finally { 37 hasBroke = true; 38 bail && bail(); 39 } 40 return evts; 41}; 42 43export type Deferrable = { 44 resolve: () => void; 45 complete: Promise<void>; 46}; 47 48export const createDeferrable = (): Deferrable => { 49 let resolve!: () => void; 50 const promise: Promise<void> = new Promise((res) => { 51 resolve = () => res(); 52 }); 53 return { resolve, complete: promise }; 54}; 55 56export const createDeferrables = (count: number): Deferrable[] => { 57 const list: Deferrable[] = []; 58 for (let i = 0; i < count; i++) { 59 list.push(createDeferrable()); 60 } 61 return list; 62}; 63 64export const allComplete = async (deferrables: Deferrable[]): Promise<void> => { 65 await Promise.all(deferrables.map((d) => d.complete)); 66}; 67 68export class AsyncBuffer<T> { 69 private buffer: T[] = []; 70 private promise: Promise<void>; 71 private resolve: () => void; 72 private closed = false; 73 private toThrow: unknown | undefined; 74 75 constructor(public maxSize?: number) { 76 this.promise = Promise.resolve(); 77 this.resolve = () => null; 78 this.resetPromise(); 79 } 80 81 get curr(): T[] { 82 return this.buffer; 83 } 84 85 get size(): number { 86 return this.buffer.length; 87 } 88 89 get isClosed(): boolean { 90 return this.closed; 91 } 92 93 resetPromise() { 94 this.promise = new Promise<void>((r) => (this.resolve = r)); 95 } 96 97 push(item: T) { 98 this.buffer.push(item); 99 this.resolve(); 100 } 101 102 pushMany(items: T[]) { 103 items.forEach((i) => this.buffer.push(i)); 104 this.resolve(); 105 } 106 107 async *events(): AsyncGenerator<T> { 108 while (true) { 109 if (this.closed && this.buffer.length === 0) { 110 if (this.toThrow) { 111 throw this.toThrow; 112 } else { 113 return; 114 } 115 } 116 await this.promise; 117 if (this.toThrow) { 118 throw this.toThrow; 119 } 120 if (this.maxSize && this.size > this.maxSize) { 121 throw new AsyncBufferFullError(this.maxSize); 122 } 123 const [first, ...rest] = this.buffer; 124 if (first) { 125 this.buffer = rest; 126 yield first; 127 } else { 128 this.resetPromise(); 129 } 130 } 131 } 132 133 throw(err: unknown) { 134 this.toThrow = err; 135 this.closed = true; 136 this.resolve(); 137 } 138 139 close() { 140 this.closed = true; 141 this.resolve(); 142 } 143} 144 145export class AsyncBufferFullError extends Error { 146 constructor(maxSize: number) { 147 super(`ReachedMaxBufferSize: ${maxSize}`); 148 } 149} 150 151export function allFulfilled<T extends readonly unknown[] | []>( 152 promises: T, 153): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }>; 154export function allFulfilled<T>( 155 promises: Iterable<T | PromiseLike<T>>, 156): Promise<Awaited<T>[]>; 157export function allFulfilled( 158 promises: Iterable<Promise<unknown>>, 159): Promise<unknown[]> { 160 return Promise.allSettled(promises).then(handleAllSettledErrors); 161} 162 163export function handleAllSettledErrors< 164 T extends readonly PromiseSettledResult<unknown>[] | [], 165>( 166 results: T, 167): { 168 -readonly [P in keyof T]: T[P] extends PromiseSettledResult<infer U> ? U 169 : never; 170}; 171export function handleAllSettledErrors<T>( 172 results: PromiseSettledResult<T>[], 173): T[]; 174export function handleAllSettledErrors( 175 results: PromiseSettledResult<unknown>[], 176): unknown[] { 177 if (results.every(isFulfilledResult)) return results.map(extractValue); 178 179 const errors = results.filter(isRejectedResult).map(extractReason); 180 throw errors; 181} 182 183export function isRejectedResult( 184 result: PromiseSettledResult<unknown>, 185): result is PromiseRejectedResult { 186 return result.status === "rejected"; 187} 188 189function extractReason(result: PromiseRejectedResult): unknown { 190 return result.reason; 191} 192 193export function isFulfilledResult<T>( 194 result: PromiseSettledResult<T>, 195): result is PromiseFulfilledResult<T> { 196 return result.status === "fulfilled"; 197} 198 199function extractValue<T>(result: PromiseFulfilledResult<T>): T { 200 return result.value; 201}