Suite of AT Protocol TypeScript libraries built on web standards
1import { bailableWait } from "./util.ts";
2
3export const readFromGenerator = async <T>(
4 gen: AsyncGenerator<T>,
5 isDone: (last?: T) => Promise<boolean> | boolean,
6 waitFor: Promise<unknown> = Promise.resolve(),
7 maxLength = Number.MAX_SAFE_INTEGER,
8): Promise<T[]> => {
9 const evts: T[] = [];
10 let bail: undefined | (() => void);
11 let hasBroke = false;
12 const awaitDone = async () => {
13 if (await isDone(evts.at(-1))) {
14 return true;
15 }
16 const bailable = bailableWait(20);
17 await bailable.wait();
18 bail = bailable.bail;
19 if (hasBroke) return false;
20 return await awaitDone();
21 };
22 const breakOn: Promise<void> = new Promise((resolve) => {
23 waitFor.then(() => {
24 awaitDone().then(() => resolve());
25 });
26 });
27
28 try {
29 while (evts.length < maxLength) {
30 const maybeEvt = await Promise.race([gen.next(), breakOn]);
31 if (!maybeEvt) break;
32 const evt = maybeEvt as IteratorResult<T>;
33 if (evt.done) break;
34 evts.push(evt.value);
35 }
36 } finally {
37 hasBroke = true;
38 bail && bail();
39 }
40 return evts;
41};
42
43export type Deferrable = {
44 resolve: () => void;
45 complete: Promise<void>;
46};
47
48export const createDeferrable = (): Deferrable => {
49 let resolve!: () => void;
50 const promise: Promise<void> = new Promise((res) => {
51 resolve = () => res();
52 });
53 return { resolve, complete: promise };
54};
55
56export const createDeferrables = (count: number): Deferrable[] => {
57 const list: Deferrable[] = [];
58 for (let i = 0; i < count; i++) {
59 list.push(createDeferrable());
60 }
61 return list;
62};
63
64export const allComplete = async (deferrables: Deferrable[]): Promise<void> => {
65 await Promise.all(deferrables.map((d) => d.complete));
66};
67
68export class AsyncBuffer<T> {
69 private buffer: T[] = [];
70 private promise: Promise<void>;
71 private resolve: () => void;
72 private closed = false;
73 private toThrow: unknown | undefined;
74
75 constructor(public maxSize?: number) {
76 this.promise = Promise.resolve();
77 this.resolve = () => null;
78 this.resetPromise();
79 }
80
81 get curr(): T[] {
82 return this.buffer;
83 }
84
85 get size(): number {
86 return this.buffer.length;
87 }
88
89 get isClosed(): boolean {
90 return this.closed;
91 }
92
93 resetPromise() {
94 this.promise = new Promise<void>((r) => (this.resolve = r));
95 }
96
97 push(item: T) {
98 this.buffer.push(item);
99 this.resolve();
100 }
101
102 pushMany(items: T[]) {
103 items.forEach((i) => this.buffer.push(i));
104 this.resolve();
105 }
106
107 async *events(): AsyncGenerator<T> {
108 while (true) {
109 if (this.closed && this.buffer.length === 0) {
110 if (this.toThrow) {
111 throw this.toThrow;
112 } else {
113 return;
114 }
115 }
116 await this.promise;
117 if (this.toThrow) {
118 throw this.toThrow;
119 }
120 if (this.maxSize && this.size > this.maxSize) {
121 throw new AsyncBufferFullError(this.maxSize);
122 }
123 const [first, ...rest] = this.buffer;
124 if (first) {
125 this.buffer = rest;
126 yield first;
127 } else {
128 this.resetPromise();
129 }
130 }
131 }
132
133 throw(err: unknown) {
134 this.toThrow = err;
135 this.closed = true;
136 this.resolve();
137 }
138
139 close() {
140 this.closed = true;
141 this.resolve();
142 }
143}
144
145export class AsyncBufferFullError extends Error {
146 constructor(maxSize: number) {
147 super(`ReachedMaxBufferSize: ${maxSize}`);
148 }
149}
150
151export function allFulfilled<T extends readonly unknown[] | []>(
152 promises: T,
153): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }>;
154export function allFulfilled<T>(
155 promises: Iterable<T | PromiseLike<T>>,
156): Promise<Awaited<T>[]>;
157export function allFulfilled(
158 promises: Iterable<Promise<unknown>>,
159): Promise<unknown[]> {
160 return Promise.allSettled(promises).then(handleAllSettledErrors);
161}
162
163export function handleAllSettledErrors<
164 T extends readonly PromiseSettledResult<unknown>[] | [],
165>(
166 results: T,
167): {
168 -readonly [P in keyof T]: T[P] extends PromiseSettledResult<infer U> ? U
169 : never;
170};
171export function handleAllSettledErrors<T>(
172 results: PromiseSettledResult<T>[],
173): T[];
174export function handleAllSettledErrors(
175 results: PromiseSettledResult<unknown>[],
176): unknown[] {
177 if (results.every(isFulfilledResult)) return results.map(extractValue);
178
179 const errors = results.filter(isRejectedResult).map(extractReason);
180 throw errors;
181}
182
183export function isRejectedResult(
184 result: PromiseSettledResult<unknown>,
185): result is PromiseRejectedResult {
186 return result.status === "rejected";
187}
188
189function extractReason(result: PromiseRejectedResult): unknown {
190 return result.reason;
191}
192
193export function isFulfilledResult<T>(
194 result: PromiseSettledResult<T>,
195): result is PromiseFulfilledResult<T> {
196 return result.status === "fulfilled";
197}
198
199function extractValue<T>(result: PromiseFulfilledResult<T>): T {
200 return result.value;
201}