Suite of AT Protocol TypeScript libraries built on web standards
1import { bailableWait } from "./util.ts";
2
3// reads values from a generator into a list
4// breaks when isDone signals `true` AND `waitFor` completes OR when a max length is reached
5// NOTE: does not signal generator to close. it *will* continue to produce values
6export const readFromGenerator = async <T>(
7 gen: AsyncGenerator<T>,
8 isDone: (last?: T) => Promise<boolean> | boolean,
9 waitFor: Promise<unknown> = Promise.resolve(),
10 maxLength = Number.MAX_SAFE_INTEGER,
11): Promise<T[]> => {
12 const evts: T[] = [];
13 let bail: undefined | (() => void);
14 let hasBroke = false;
15 const awaitDone = async () => {
16 if (await isDone(evts.at(-1))) {
17 return true;
18 }
19 const bailable = bailableWait(20);
20 await bailable.wait();
21 bail = bailable.bail;
22 if (hasBroke) return false;
23 return await awaitDone();
24 };
25 const breakOn: Promise<void> = new Promise((resolve) => {
26 waitFor.then(() => {
27 awaitDone().then(() => resolve());
28 });
29 });
30
31 try {
32 while (evts.length < maxLength) {
33 const maybeEvt = await Promise.race([gen.next(), breakOn]);
34 if (!maybeEvt) break;
35 const evt = maybeEvt as IteratorResult<T>;
36 if (evt.done) break;
37 evts.push(evt.value);
38 }
39 } finally {
40 hasBroke = true;
41 bail && bail();
42 }
43 return evts;
44};
45
46export type Deferrable = {
47 resolve: () => void;
48 complete: Promise<void>;
49};
50
51export const createDeferrable = (): Deferrable => {
52 let resolve!: () => void;
53 const promise: Promise<void> = new Promise((res) => {
54 resolve = () => res();
55 });
56 return { resolve, complete: promise };
57};
58
59export const createDeferrables = (count: number): Deferrable[] => {
60 const list: Deferrable[] = [];
61 for (let i = 0; i < count; i++) {
62 list.push(createDeferrable());
63 }
64 return list;
65};
66
67export const allComplete = async (deferrables: Deferrable[]): Promise<void> => {
68 await Promise.all(deferrables.map((d) => d.complete));
69};
70
71export class AsyncBuffer<T> {
72 private buffer: T[] = [];
73 private promise: Promise<void>;
74 private resolve: () => void;
75 private closed = false;
76 private toThrow: unknown | undefined;
77
78 constructor(public maxSize?: number) {
79 // Initializing to satisfy types/build, immediately reset by resetPromise()
80 this.promise = Promise.resolve();
81 this.resolve = () => null;
82 this.resetPromise();
83 }
84
85 get curr(): T[] {
86 return this.buffer;
87 }
88
89 get size(): number {
90 return this.buffer.length;
91 }
92
93 get isClosed(): boolean {
94 return this.closed;
95 }
96
97 resetPromise() {
98 this.promise = new Promise<void>((r) => (this.resolve = r));
99 }
100
101 push(item: T) {
102 this.buffer.push(item);
103 this.resolve();
104 }
105
106 pushMany(items: T[]) {
107 items.forEach((i) => this.buffer.push(i));
108 this.resolve();
109 }
110
111 async *events(): AsyncGenerator<T> {
112 while (true) {
113 if (this.closed && this.buffer.length === 0) {
114 if (this.toThrow) {
115 throw this.toThrow;
116 } else {
117 return;
118 }
119 }
120 await this.promise;
121 if (this.toThrow) {
122 throw this.toThrow;
123 }
124 if (this.maxSize && this.size > this.maxSize) {
125 throw new AsyncBufferFullError(this.maxSize);
126 }
127 const [first, ...rest] = this.buffer;
128 if (first) {
129 this.buffer = rest;
130 yield first;
131 } else {
132 this.resetPromise();
133 }
134 }
135 }
136
137 throw(err: unknown) {
138 this.toThrow = err;
139 this.closed = true;
140 this.resolve();
141 }
142
143 close() {
144 this.closed = true;
145 this.resolve();
146 }
147}
148
149export class AsyncBufferFullError extends Error {
150 constructor(maxSize: number) {
151 super(`ReachedMaxBufferSize: ${maxSize}`);
152 }
153}
154
155/**
156 * Utility function that behaves like {@link Promise.allSettled} but returns the
157 * same result as {@link Promise.all} in case every promise is fulfilled, and
158 * throws an {@link AggregateError} if there are more than one errors.
159 */
160export function allFulfilled<T extends readonly unknown[] | []>(
161 promises: T,
162): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }>;
163export function allFulfilled<T>(
164 promises: Iterable<T | PromiseLike<T>>,
165): Promise<Awaited<T>[]>;
166export function allFulfilled(
167 promises: Iterable<Promise<unknown>>,
168): Promise<unknown[]> {
169 return Promise.allSettled(promises).then(handleAllSettledErrors);
170}
171
172export function handleAllSettledErrors<
173 T extends readonly PromiseSettledResult<unknown>[] | [],
174>(
175 results: T,
176): {
177 -readonly [P in keyof T]: T[P] extends PromiseSettledResult<infer U> ? U
178 : never;
179};
180export function handleAllSettledErrors<T>(
181 results: PromiseSettledResult<T>[],
182): T[];
183export function handleAllSettledErrors(
184 results: PromiseSettledResult<unknown>[],
185): unknown[] {
186 if (results.every(isFulfilledResult)) return results.map(extractValue);
187
188 const errors = results.filter(isRejectedResult).map(extractReason);
189 throw errors;
190}
191
192export function isRejectedResult(
193 result: PromiseSettledResult<unknown>,
194): result is PromiseRejectedResult {
195 return result.status === "rejected";
196}
197
198function extractReason(result: PromiseRejectedResult): unknown {
199 return result.reason;
200}
201
202export function isFulfilledResult<T>(
203 result: PromiseSettledResult<T>,
204): result is PromiseFulfilledResult<T> {
205 return result.status === "fulfilled";
206}
207
208function extractValue<T>(result: PromiseFulfilledResult<T>): T {
209 return result.value;
210}