Suite of AT Protocol TypeScript libraries built on web standards
20
fork

Configure Feed

Select the types of activity you want to include in your feed.

revert ws change

+158 -78
+1 -1
common/deno.json
··· 1 1 { 2 2 "name": "@atp/common", 3 - "version": "0.1.0-alpha.8", 3 + "version": "0.1.0-alpha.9", 4 4 "exports": { 5 5 ".": "./mod.ts", 6 6 "./server": "./server.ts"
+1 -7
common/util.ts
··· 180 180 } 181 181 } 182 182 return output; 183 - }; 184 - 185 - export const isErrnoException = ( 186 - err: unknown, 187 - ): err is NodeJS.ErrnoException => { 188 - return !!err && (err as Record<string, unknown>)["code"] !== undefined; 189 - }; 183 + };
+1 -1
xrpc-server/deno.json
··· 1 1 { 2 2 "name": "@atp/xrpc-server", 3 - "version": "0.1.0-alpha.7", 3 + "version": "0.1.0-alpha.8", 4 4 "exports": "./mod.ts", 5 5 "license": "MIT", 6 6 "imports": {
+155 -69
xrpc-server/stream/websocket-keepalive.ts
··· 1 - import { type ClientOptions, createWebSocketStream, WebSocket } from "ws"; 2 - import { isErrnoException, SECOND, wait } from "@atp/common"; 1 + // websocket-keepalive.ts 2 + // Runtime-agnostic (Deno / Workers / Bun / Browser) 3 + 4 + import { SECOND, wait } from "@atp/common"; 3 5 import { CloseCode, DisconnectError } from "./types.ts"; 6 + import { iterateBinary } from "./stream.ts"; 4 7 5 - export type KeepAliveOptions = ClientOptions & { 8 + /** 9 + * Options for a {@link WebSocketKeepAlive} instance 10 + * 11 + * @prop getUrl Method to get the current URL of the websocket endpoint 12 + * @prop maxReconnectSeconds Maximum time a request can take to reconnect 13 + * @prop signal Abort signal to send when aborting connection 14 + * 15 + * @prop heartbeatIntervalMs Interval to send provided heartbeatPayload on, 16 + * @prop heartbeatPayload Method to create payload to send for heartbeat 17 + * @prop isPong If provided, we mark alive only when it returns true for a message 18 + * if omitted, *any* message is considered proof of life 19 + * 20 + * @prop onReconnectError Reconnect hook 21 + * 22 + * @prop createSocket Socket factory override (lets you use custom client if needed) 23 + * @prop protocols Override value for accepted protocols 24 + */ 25 + export type KeepAliveOptions = { 6 26 getUrl: () => Promise<string>; 7 27 maxReconnectSeconds?: number; 8 28 signal?: AbortSignal; 9 - heartbeatIntervalMs?: number; 10 - onReconnectError?: ( 11 - error: unknown, 12 - n: number, 13 - initialSetup: boolean, 14 - ) => void; 29 + 30 + heartbeatIntervalMs?: number; // default 10 * SECOND 31 + heartbeatPayload?: () => string | ArrayBuffer | Uint8Array | Blob; 32 + isPong?: (data: unknown) => boolean; 33 + 34 + // Reconnect hook 35 + onReconnectError?: (error: unknown, n: number, initialSetup: boolean) => void; 36 + 37 + createSocket?: (url: string, protocols?: string | string[]) => WebSocket; 38 + protocols?: string | string[]; 15 39 }; 16 40 17 41 export class WebSocketKeepAlive { ··· 19 43 public initialSetup = true; 20 44 public reconnects: number | null = null; 21 45 46 + /** 47 + * Creates a new WebSocketKeepAlive instance. 48 + * @param opts Configuration options for keepalive, heartbeat, reconnect, and socket creation. 49 + */ 22 50 constructor(public opts: KeepAliveOptions) {} 23 51 24 52 async *[Symbol.asyncIterator](): AsyncGenerator<Uint8Array> { 25 53 const maxReconnectMs = 1000 * (this.opts.maxReconnectSeconds ?? 64); 54 + 26 55 while (true) { 27 56 if (this.reconnects !== null) { 28 57 const duration = this.initialSetup ··· 30 59 : backoffMs(this.reconnects++, maxReconnectMs); 31 60 await wait(duration); 32 61 } 62 + 33 63 const url = await this.opts.getUrl(); 34 - this.ws = new WebSocket(url, this.opts); 64 + 65 + // Create a web-standard WebSocket (or a custom one if provided). 66 + const ws = this.opts.createSocket?.(url, this.opts.protocols) ?? 67 + new WebSocket(url, this.opts.protocols); 68 + this.ws = ws; 69 + 35 70 const ac = new AbortController(); 36 71 if (this.opts.signal) { 37 72 forwardSignal(this.opts.signal, ac); 38 73 } 39 - this.ws.once("open", () => { 40 - this.initialSetup = false; 41 - this.reconnects = 0; 42 - if (this.ws) { 43 - this.startHeartbeat(this.ws); 44 - } 45 - }); 46 - this.ws.once("close", (code: number, reason: Uint8Array) => { 47 - if (code === CloseCode.Abnormal) { 48 - ac.abort( 49 - new AbnormalCloseError( 50 - `Abnormal ws close: ${new TextDecoder().decode(reason)}`, 51 - ), 52 - ); 53 - } 54 - }); 74 + 75 + // Track liveness (application-level heartbeat) 76 + this.startHeartbeat(ws, ac); 77 + 78 + // When the socket opens, reset backoff. 79 + ws.addEventListener( 80 + "open", 81 + () => { 82 + this.initialSetup = false; 83 + this.reconnects = 0; 84 + }, 85 + { once: true }, 86 + ); 87 + 88 + // Distinguish abnormal close → treat as reconnectable error 89 + ws.addEventListener( 90 + "close", 91 + (ev) => { 92 + if (ev.code === CloseCode.Abnormal) { 93 + ac.abort( 94 + new AbnormalCloseError( 95 + `Abnormal ws close: ${String(ev.reason || "")}`, 96 + ), 97 + ); 98 + } 99 + }, 100 + { once: true }, 101 + ); 55 102 56 103 try { 57 - const wsStream = createWebSocketStream(this.ws, { 58 - signal: ac.signal, 59 - readableObjectMode: true, 60 - }); 61 - for await (const chunk of wsStream) { 104 + // Iterate incoming binary chunks 105 + for await (const chunk of iterateBinary(ws)) { 62 106 yield chunk; 63 107 } 64 - } catch (_err) { 65 - const err = isErrnoException(_err) && _err.code === "ABORT_ERR" 66 - ? _err.cause 67 - : _err; 108 + } catch (error) { 109 + // Normalize Abort into same shape your old code expected. 110 + const err = (error as Error)?.name === "AbortError" 111 + ? (error as Error).cause ?? error 112 + : error; 113 + 68 114 if (err instanceof DisconnectError) { 69 - this.ws?.close(err.wsCode); 115 + // We cleanly end the connection 116 + ws?.close(err.wsCode); 70 117 break; 71 118 } 72 - this.ws?.close(); 119 + 120 + // Close if not already closing 121 + ws.close(); 122 + 73 123 if (isReconnectable(err)) { 74 - this.reconnects ??= 0; 124 + this.reconnects ??= 0; // Never reconnect when null 75 125 this.opts.onReconnectError?.(err, this.reconnects, this.initialSetup); 76 - continue; 126 + continue; // loop to reconnect 77 127 } else { 78 128 throw err; 79 129 } 80 130 } 131 + 132 + // Other side ended stream cleanly; stop iterating. 81 133 break; 82 134 } 83 135 } 84 136 85 - startHeartbeat(ws: WebSocket) { 137 + /** Application-level heartbeat (web standard). 138 + * 139 + * In Node's `ws` you used `ping`/`pong`. Those do not exist in web sockets. 140 + * Here we: 141 + * - periodically send `heartbeatPayload()` if provided 142 + * - consider the connection "alive" when: 143 + * * `isPong(ev.data)` returns true (if provided), OR 144 + * * *any* message is received (fallback) 145 + * - if no proof of life for one interval, we close the socket (which triggers reconnect) 146 + */ 147 + private startHeartbeat(ws: WebSocket, ac: AbortController) { 148 + const intervalMs = this.opts.heartbeatIntervalMs ?? 10 * SECOND; 149 + 86 150 let isAlive = true; 87 - let heartbeatInterval: ReturnType<typeof setInterval> | null = null; 151 + let timer: number | null = null; 88 152 89 - const checkAlive = () => { 153 + const onMessage = (ev: MessageEvent) => { 154 + // If a custom pong detector exists, use it; otherwise any message counts. 155 + if (!this.opts.isPong || this.opts.isPong(ev.data)) { 156 + isAlive = true; 157 + } 158 + }; 159 + 160 + const tick = () => { 90 161 if (!isAlive) { 91 - return ws.terminate(); 162 + // No pong/traffic since last tick → consider dead and close. 163 + ws.close(1000); 164 + // Abort the iterator with a recognizable shape like before. 165 + const domErr = new DOMException("Aborted", "AbortError"); 166 + domErr.cause = new DisconnectError( 167 + CloseCode.Abnormal, 168 + "HeartbeatTimeout", 169 + ); 170 + ac.abort(domErr); 171 + return; 92 172 } 93 173 isAlive = false; 94 - ws.ping(); 174 + 175 + const payload = this.opts.heartbeatPayload?.(); 176 + if (payload !== undefined) { 177 + ws.send(payload); 178 + } 95 179 }; 96 180 97 - checkAlive(); 98 - heartbeatInterval = setInterval( 99 - checkAlive, 100 - this.opts.heartbeatIntervalMs ?? 10 * SECOND, 101 - ); 181 + // Prime one cycle and schedule subsequent ones 182 + tick(); 183 + timer = setInterval(tick, intervalMs) as unknown as number; 102 184 103 - ws.on("pong", () => { 104 - isAlive = true; 105 - }); 106 - ws.once("close", () => { 107 - if (heartbeatInterval) { 108 - clearInterval(heartbeatInterval); 109 - heartbeatInterval = null; 110 - } 111 - }); 185 + ws.addEventListener("message", onMessage); 186 + ws.addEventListener( 187 + "close", 188 + () => { 189 + if (timer !== null) { 190 + clearInterval(timer); 191 + timer = null; 192 + } 193 + ws.removeEventListener("message", onMessage); 194 + }, 195 + { once: true }, 196 + ); 112 197 } 113 198 } 114 199 ··· 119 204 } 120 205 121 206 function isReconnectable(err: unknown): boolean { 122 - if (isErrnoException(err) && typeof err.code === "string") { 123 - return networkErrorCodes.includes(err.code); 124 - } 125 - return false; 207 + // Network-ish errors are reconnectable. Keep your previous codes. 208 + if (!err || typeof err !== "object") return false; 209 + const e = err as { name?: unknown; code?: unknown }; 210 + if (typeof e.name !== "string") return false; 211 + return typeof e.code === "string" && networkErrorCodes.includes(e.code); 126 212 } 127 213 128 214 const networkErrorCodes = [ ··· 133 219 "EPIPE", 134 220 "ETIMEDOUT", 135 221 "ECANCELED", 222 + "ABORT_ERR", // surface our aborts as reconnectable if you want 136 223 ]; 137 224 138 225 function backoffMs(n: number, maxMs: number) { 139 - const baseSec = Math.pow(2, n); 140 - const randSec = Math.random() - 0.5; 226 + const baseSec = Math.pow(2, n); // 1, 2, 4, ... 227 + const randSec = Math.random() - 0.5; // jitter [-0.5, +0.5] 141 228 const ms = 1000 * (baseSec + randSec); 142 229 return Math.min(ms, maxMs); 143 230 } ··· 145 232 function forwardSignal(signal: AbortSignal, ac: AbortController) { 146 233 if (signal.aborted) { 147 234 return ac.abort(signal.reason); 148 - } else { 149 - signal.addEventListener("abort", () => ac.abort(signal.reason), { 150 - signal: ac.signal, 151 - }); 152 235 } 153 - } 236 + const onAbort = () => ac.abort(signal.reason); 237 + // Use AbortSignal.any? Not universally available; just add/remove. 238 + signal.addEventListener("abort", onAbort, { signal: ac.signal }); 239 + }