fork of hey-api/openapi-ts because I need some additional things
1// This file is auto-generated by @hey-api/openapi-ts
2
3import type { Config } from './types.gen';
4
5export type ServerSentEventsOptions<TData = unknown> = Omit<RequestInit, 'method'> &
6 Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & {
7 /**
8 * Fetch API implementation. You can use this option to provide a custom
9 * fetch instance.
10 *
11 * @default globalThis.fetch
12 */
13 fetch?: typeof fetch;
14 /**
15 * Implementing clients can call request interceptors inside this hook.
16 */
17 onRequest?: (url: string, init: RequestInit) => Promise<Request>;
18 /**
19 * Callback invoked when a network or parsing error occurs during streaming.
20 *
21 * This option applies only if the endpoint returns a stream of events.
22 *
23 * @param error The error that occurred.
24 */
25 onSseError?: (error: unknown) => void;
26 /**
27 * Callback invoked when an event is streamed from the server.
28 *
29 * This option applies only if the endpoint returns a stream of events.
30 *
31 * @param event Event streamed from the server.
32 * @returns Nothing (void).
33 */
34 onSseEvent?: (event: StreamEvent<TData>) => void;
35 serializedBody?: RequestInit['body'];
36 /**
37 * Default retry delay in milliseconds.
38 *
39 * This option applies only if the endpoint returns a stream of events.
40 *
41 * @default 3000
42 */
43 sseDefaultRetryDelay?: number;
44 /**
45 * Maximum number of retry attempts before giving up.
46 */
47 sseMaxRetryAttempts?: number;
48 /**
49 * Maximum retry delay in milliseconds.
50 *
51 * Applies only when exponential backoff is used.
52 *
53 * This option applies only if the endpoint returns a stream of events.
54 *
55 * @default 30000
56 */
57 sseMaxRetryDelay?: number;
58 /**
59 * Optional sleep function for retry backoff.
60 *
61 * Defaults to using `setTimeout`.
62 */
63 sseSleepFn?: (ms: number) => Promise<void>;
64 url: string;
65 };
66
67export interface StreamEvent<TData = unknown> {
68 data: TData;
69 event?: string;
70 id?: string;
71 retry?: number;
72}
73
74export type ServerSentEventsResult<TData = unknown, TReturn = void, TNext = unknown> = {
75 stream: AsyncGenerator<
76 TData extends Record<string, unknown> ? TData[keyof TData] : TData,
77 TReturn,
78 TNext
79 >;
80};
81
82export function createSseClient<TData = unknown>({
83 onRequest,
84 onSseError,
85 onSseEvent,
86 responseTransformer,
87 responseValidator,
88 sseDefaultRetryDelay,
89 sseMaxRetryAttempts,
90 sseMaxRetryDelay,
91 sseSleepFn,
92 url,
93 ...options
94}: ServerSentEventsOptions): ServerSentEventsResult<TData> {
95 let lastEventId: string | undefined;
96
97 const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)));
98
99 const createStream = async function* () {
100 let retryDelay: number = sseDefaultRetryDelay ?? 3000;
101 let attempt = 0;
102 const signal = options.signal ?? new AbortController().signal;
103
104 while (true) {
105 if (signal.aborted) break;
106
107 attempt++;
108
109 const headers =
110 options.headers instanceof Headers
111 ? options.headers
112 : new Headers(options.headers as Record<string, string> | undefined);
113
114 if (lastEventId !== undefined) {
115 headers.set('Last-Event-ID', lastEventId);
116 }
117
118 try {
119 const requestInit: RequestInit = {
120 redirect: 'follow',
121 ...options,
122 body: options.serializedBody,
123 headers,
124 signal,
125 };
126 let request = new Request(url, requestInit);
127 if (onRequest) {
128 request = await onRequest(url, requestInit);
129 }
130 // fetch must be assigned here, otherwise it would throw the error:
131 // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
132 const _fetch = options.fetch ?? globalThis.fetch;
133 const response = await _fetch(request);
134
135 if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`);
136
137 if (!response.body) throw new Error('No body in SSE response');
138
139 const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
140
141 let buffer = '';
142
143 const abortHandler = () => {
144 try {
145 reader.cancel();
146 } catch {
147 // noop
148 }
149 };
150
151 signal.addEventListener('abort', abortHandler);
152
153 try {
154 while (true) {
155 const { done, value } = await reader.read();
156 if (done) break;
157 buffer += value;
158 buffer = buffer.replace(/\r\n?/g, '\n'); // normalize line endings
159
160 const chunks = buffer.split('\n\n');
161 buffer = chunks.pop() ?? '';
162
163 for (const chunk of chunks) {
164 const lines = chunk.split('\n');
165 const dataLines: Array<string> = [];
166 let eventName: string | undefined;
167
168 for (const line of lines) {
169 if (line.startsWith('data:')) {
170 dataLines.push(line.replace(/^data:\s*/, ''));
171 } else if (line.startsWith('event:')) {
172 eventName = line.replace(/^event:\s*/, '');
173 } else if (line.startsWith('id:')) {
174 lastEventId = line.replace(/^id:\s*/, '');
175 } else if (line.startsWith('retry:')) {
176 const parsed = Number.parseInt(line.replace(/^retry:\s*/, ''), 10);
177 if (!Number.isNaN(parsed)) {
178 retryDelay = parsed;
179 }
180 }
181 }
182
183 let data: unknown;
184 let parsedJson = false;
185
186 if (dataLines.length) {
187 const rawData = dataLines.join('\n');
188 try {
189 data = JSON.parse(rawData);
190 parsedJson = true;
191 } catch {
192 data = rawData;
193 }
194 }
195
196 if (parsedJson) {
197 if (responseValidator) {
198 await responseValidator(data);
199 }
200
201 if (responseTransformer) {
202 data = await responseTransformer(data);
203 }
204 }
205
206 onSseEvent?.({
207 data,
208 event: eventName,
209 id: lastEventId,
210 retry: retryDelay,
211 });
212
213 if (dataLines.length) {
214 yield data as any;
215 }
216 }
217 }
218 } finally {
219 signal.removeEventListener('abort', abortHandler);
220 reader.releaseLock();
221 }
222
223 break; // exit loop on normal completion
224 } catch (error) {
225 // connection failed or aborted; retry after delay
226 onSseError?.(error);
227
228 if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) {
229 break; // stop after firing error
230 }
231
232 // exponential backoff: double retry each attempt, cap at 30s
233 const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000);
234 await sleep(backoff);
235 }
236 }
237 };
238
239 const stream = createStream();
240
241 return { stream };
242}