Suite of AT Protocol TypeScript libraries built on web standards
20
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: use ws for keepalive

+90 -180
+6
common/util.ts
··· 181 181 } 182 182 return output; 183 183 }; 184 + 185 + export const isErrnoException = ( 186 + err: unknown, 187 + ): err is NodeJS.ErrnoException => { 188 + return !!err && (err as Record<string, unknown>)["code"] !== undefined; 189 + };
+10 -22
deno.lock
··· 35 35 "jsr:@ts-morph/ts-morph@26": "26.0.0", 36 36 "jsr:@zod/zod@^4.1.11": "4.1.13", 37 37 "jsr:@zod/zod@^4.1.13": "4.1.13", 38 - "npm:@atproto/crypto@*": "0.4.4", 38 + "npm:@atproto/crypto@*": "0.1.0", 39 39 "npm:@did-plc/lib@^0.0.4": "0.0.4", 40 40 "npm:@did-plc/server@^0.0.1": "0.0.1_express@4.21.2", 41 41 "npm:@ipld/dag-cbor@^9.2.5": "9.2.5", ··· 45 45 "npm:multiformats@^13.4.1": "13.4.1", 46 46 "npm:p-queue@^8.1.1": "8.1.1", 47 47 "npm:prettier@^3.6.2": "3.6.2", 48 - "npm:rate-limiter-flexible@9": "9.0.0" 48 + "npm:rate-limiter-flexible@9": "9.0.0", 49 + "npm:ws@^8.18.0": "8.18.3" 49 50 }, 50 51 "jsr": { 51 52 "@cliffy/ansi@1.0.0-rc.8": { ··· 221 222 "uint8arrays" 222 223 ] 223 224 }, 224 - "@atproto/crypto@0.4.4": { 225 - "integrity": "sha512-Yq9+crJ7WQl7sxStVpHgie5Z51R05etaK9DLWYG/7bR5T4bhdcIgF6IfklLShtZwLYdVVj+K15s0BqW9a8PSDA==", 226 - "dependencies": [ 227 - "@noble/curves", 228 - "@noble/hashes", 229 - "uint8arrays" 230 - ] 231 - }, 232 225 "@did-plc/lib@0.0.4": { 233 226 "integrity": "sha512-Omeawq3b8G/c/5CtkTtzovSOnWuvIuCI4GTJNrt1AmCskwEQV7zbX5d6km1mjJNbE0gHuQPTVqZxLVqetNbfwA==", 234 227 "dependencies": [ 235 228 "@atproto/common@0.1.1", 236 - "@atproto/crypto@0.1.0", 229 + "@atproto/crypto", 237 230 "@ipld/dag-cbor@7.0.3", 238 231 "axios", 239 232 "multiformats@9.9.0", ··· 245 238 "integrity": "sha512-GtxxHcOrOQ6fNI1ufq3Zqjc2PtWqPZOdsuzlwtxiH9XibUGwDkb0GmaBHyU5GiOxOKZEW1GspZ8mreBA6XOlTQ==", 246 239 "dependencies": [ 247 240 "@atproto/common@0.1.0", 248 - "@atproto/crypto@0.1.0", 241 + "@atproto/crypto", 249 242 "@did-plc/lib", 250 243 "axios", 251 244 "cors", ··· 272 265 "cborg@4.2.15", 273 266 "multiformats@13.4.1" 274 267 ] 275 - }, 276 - "@noble/curves@1.9.7": { 277 - "integrity": "sha512-gbKGcRUYIjA3/zCCNaWDciTMFI0dCkvou3TL8Zmy5Nc7sJ47a0jtOeZoTaMxkuqRo9cRhjOdZJXegxYE5FN/xw==", 278 - "dependencies": [ 279 - "@noble/hashes" 280 - ] 281 - }, 282 - "@noble/hashes@1.8.0": { 283 - "integrity": "sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A==" 284 268 }, 285 269 "@noble/secp256k1@1.7.2": { 286 270 "integrity": "sha512-/qzwYl5eFLH8OWIecQWM31qld2g1NfjgylK+TNhqtaUKP37Nm+Y+z30Fjhw0Ct8p9yCQEm2N3W/AckdIb3SMcQ==" ··· 1094 1078 "vary@1.1.2": { 1095 1079 "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==" 1096 1080 }, 1081 + "ws@8.18.3": { 1082 + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==" 1083 + }, 1097 1084 "xtend@4.0.2": { 1098 1085 "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==" 1099 1086 }, ··· 1193 1180 "npm:get-port@^7.1.0", 1194 1181 "npm:key-encoder@^2.0.3", 1195 1182 "npm:multiformats@^13.4.1", 1196 - "npm:rate-limiter-flexible@9" 1183 + "npm:rate-limiter-flexible@9", 1184 + "npm:ws@^8.18.0" 1197 1185 ] 1198 1186 } 1199 1187 }
+1 -1
sync/deno.json
··· 1 1 { 2 2 "name": "@atp/sync", 3 - "version": "0.1.0-alpha.5", 3 + "version": "0.1.0-alpha.6", 4 4 "exports": "./mod.ts", 5 5 "license": "MIT", 6 6 "imports": {
+1 -1
sync/firehose/index.ts
··· 250 250 if (!parsed) { 251 251 continue; 252 252 } 253 - await this.opts.runner.trackEvent( 253 + this.opts.runner.trackEvent( 254 254 parsed.did, 255 255 parsed.seq, 256 256 async () => {
+3 -2
xrpc-server/deno.json
··· 1 1 { 2 2 "name": "@atp/xrpc-server", 3 - "version": "0.1.0-alpha.6", 3 + "version": "0.1.0-alpha.7", 4 4 "exports": "./mod.ts", 5 5 "license": "MIT", 6 6 "imports": { ··· 11 11 "multiformats": "npm:multiformats@^13.4.1", 12 12 "zod": "jsr:@zod/zod@^4.1.13", 13 13 "hono": "jsr:@hono/hono@^4.10.8", 14 - "rate-limiter-flexible": "npm:rate-limiter-flexible@^9.0.0" 14 + "rate-limiter-flexible": "npm:rate-limiter-flexible@^9.0.0", 15 + "ws": "npm:ws@^8.18.0" 15 16 }, 16 17 "test": { 17 18 "permissions": {
+69 -154
xrpc-server/stream/websocket-keepalive.ts
··· 1 - // websocket-keepalive.ts 2 - // Runtime-agnostic (Deno / Workers / Bun / Browser) 3 - 4 - import { SECOND, wait } from "@atp/common"; 1 + import { type ClientOptions, WebSocket, createWebSocketStream } from "ws"; 2 + import { isErrnoException, SECOND, wait } from "@atp/common"; 5 3 import { CloseCode, DisconnectError } from "./types.ts"; 6 - import { iterateBinary } from "./stream.ts"; 7 4 8 - /** 9 - * Options for a {@link WebSocketKeepAlive} instance 10 - * 11 - * @prop getUrl Method to get the current URL of the websocket endpoint 12 - * @prop maxReconnectSeconds Maximum time a request can take to reconnect 13 - * @prop signal Abort signal to send when aborting connection 14 - * 15 - * @prop heartbeatIntervalMs Interval to send provided heartbeatPayload on, 16 - * @prop heartbeatPayload Method to create payload to send for heartbeat 17 - * @prop isPong If provided, we mark alive only when it returns true for a message 18 - * if omitted, *any* message is considered proof of life 19 - * 20 - * @prop onReconnectError Reconnect hook 21 - * 22 - * @prop createSocket Socket factory override (lets you use custom client if needed) 23 - * @prop protocols Override value for accepted protocols 24 - */ 25 - export type KeepAliveOptions = { 5 + export type KeepAliveOptions = ClientOptions & { 26 6 getUrl: () => Promise<string>; 27 7 maxReconnectSeconds?: number; 28 8 signal?: AbortSignal; 29 - 30 - heartbeatIntervalMs?: number; // default 10 * SECOND 31 - heartbeatPayload?: () => string | ArrayBuffer | Uint8Array | Blob; 32 - isPong?: (data: unknown) => boolean; 33 - 34 - // Reconnect hook 35 - onReconnectError?: (error: unknown, n: number, initialSetup: boolean) => void; 36 - 37 - createSocket?: (url: string, protocols?: string | string[]) => WebSocket; 38 - protocols?: string | string[]; 9 + heartbeatIntervalMs?: number; 10 + onReconnectError?: ( 11 + error: unknown, 12 + n: number, 13 + initialSetup: boolean, 14 + ) => void; 39 15 }; 40 16 41 17 export class WebSocketKeepAlive { ··· 43 19 public initialSetup = true; 44 20 public reconnects: number | null = null; 45 21 46 - /** 47 - * Creates a new WebSocketKeepAlive instance. 48 - * @param opts Configuration options for keepalive, heartbeat, reconnect, and socket creation. 49 - */ 50 22 constructor(public opts: KeepAliveOptions) {} 51 23 52 24 async *[Symbol.asyncIterator](): AsyncGenerator<Uint8Array> { 53 25 const maxReconnectMs = 1000 * (this.opts.maxReconnectSeconds ?? 64); 54 - 55 26 while (true) { 56 27 if (this.reconnects !== null) { 57 28 const duration = this.initialSetup ··· 59 30 : backoffMs(this.reconnects++, maxReconnectMs); 60 31 await wait(duration); 61 32 } 62 - 63 33 const url = await this.opts.getUrl(); 64 - 65 - // Create a web-standard WebSocket (or a custom one if provided). 66 - const ws = this.opts.createSocket?.(url, this.opts.protocols) ?? 67 - new WebSocket(url, this.opts.protocols); 68 - this.ws = ws; 69 - 34 + this.ws = new WebSocket(url, this.opts); 70 35 const ac = new AbortController(); 71 36 if (this.opts.signal) { 72 37 forwardSignal(this.opts.signal, ac); 73 38 } 74 - 75 - // Track liveness (application-level heartbeat) 76 - this.startHeartbeat(ws, ac); 77 - 78 - // When the socket opens, reset backoff. 79 - ws.addEventListener( 80 - "open", 81 - () => { 82 - this.initialSetup = false; 83 - this.reconnects = 0; 84 - }, 85 - { once: true }, 86 - ); 87 - 88 - // Distinguish abnormal close → treat as reconnectable error 89 - ws.addEventListener( 90 - "close", 91 - (ev) => { 92 - if (ev.code === CloseCode.Abnormal) { 93 - ac.abort( 94 - new AbnormalCloseError( 95 - `Abnormal ws close: ${String(ev.reason || "")}`, 96 - ), 97 - ); 98 - } 99 - }, 100 - { once: true }, 101 - ); 39 + this.ws.once("open", () => { 40 + this.initialSetup = false; 41 + this.reconnects = 0; 42 + if (this.ws) { 43 + this.startHeartbeat(this.ws); 44 + } 45 + }); 46 + this.ws.once("close", (code: number, reason: Uint8Array) => { 47 + if (code === CloseCode.Abnormal) { 48 + ac.abort( 49 + new AbnormalCloseError( 50 + `Abnormal ws close: ${new TextDecoder().decode(reason)}`, 51 + ), 52 + ); 53 + } 54 + }); 102 55 103 56 try { 104 - // Iterate incoming binary chunks 105 - for await (const chunk of iterateBinary(ws)) { 57 + const wsStream = createWebSocketStream(this.ws, { 58 + signal: ac.signal, 59 + readableObjectMode: true, 60 + }); 61 + for await (const chunk of wsStream) { 106 62 yield chunk; 107 63 } 108 - } catch (error) { 109 - // Normalize Abort into same shape your old code expected. 110 - const err = (error as Error)?.name === "AbortError" 111 - ? (error as Error).cause ?? error 112 - : error; 113 - 64 + } catch (_err) { 65 + const err = 66 + isErrnoException(_err) && _err.code === "ABORT_ERR" 67 + ? _err.cause 68 + : _err; 114 69 if (err instanceof DisconnectError) { 115 - // We cleanly end the connection 116 - ws?.close(err.wsCode); 70 + this.ws?.close(err.wsCode); 117 71 break; 118 72 } 119 - 120 - // Close if not already closing 121 - ws.close(); 122 - 73 + this.ws?.close(); 123 74 if (isReconnectable(err)) { 124 - this.reconnects ??= 0; // Never reconnect when null 75 + this.reconnects ??= 0; 125 76 this.opts.onReconnectError?.(err, this.reconnects, this.initialSetup); 126 - continue; // loop to reconnect 77 + continue; 127 78 } else { 128 79 throw err; 129 80 } 130 81 } 131 - 132 - // Other side ended stream cleanly; stop iterating. 133 82 break; 134 83 } 135 84 } 136 85 137 - /** Application-level heartbeat (web standard). 138 - * 139 - * In Node's `ws` you used `ping`/`pong`. Those do not exist in web sockets. 140 - * Here we: 141 - * - periodically send `heartbeatPayload()` if provided 142 - * - consider the connection "alive" when: 143 - * * `isPong(ev.data)` returns true (if provided), OR 144 - * * *any* message is received (fallback) 145 - * - if no proof of life for one interval, we close the socket (which triggers reconnect) 146 - */ 147 - private startHeartbeat(ws: WebSocket, ac: AbortController) { 148 - const intervalMs = this.opts.heartbeatIntervalMs ?? 10 * SECOND; 149 - 86 + startHeartbeat(ws: WebSocket) { 150 87 let isAlive = true; 151 - let timer: number | null = null; 152 - 153 - const onMessage = (ev: MessageEvent) => { 154 - // If a custom pong detector exists, use it; otherwise any message counts. 155 - if (!this.opts.isPong || this.opts.isPong(ev.data)) { 156 - isAlive = true; 157 - } 158 - }; 88 + let heartbeatInterval: ReturnType<typeof setInterval> | null = null; 159 89 160 - const tick = () => { 90 + const checkAlive = () => { 161 91 if (!isAlive) { 162 - // No pong/traffic since last tick → consider dead and close. 163 - ws.close(1000); 164 - // Abort the iterator with a recognizable shape like before. 165 - const domErr = new DOMException("Aborted", "AbortError"); 166 - domErr.cause = new DisconnectError( 167 - CloseCode.Abnormal, 168 - "HeartbeatTimeout", 169 - ); 170 - ac.abort(domErr); 171 - return; 92 + return ws.terminate(); 172 93 } 173 94 isAlive = false; 174 - 175 - const payload = this.opts.heartbeatPayload?.(); 176 - if (payload !== undefined) { 177 - ws.send(payload); 178 - } 95 + ws.ping(); 179 96 }; 180 97 181 - // Prime one cycle and schedule subsequent ones 182 - tick(); 183 - timer = setInterval(tick, intervalMs) as unknown as number; 98 + checkAlive(); 99 + heartbeatInterval = setInterval( 100 + checkAlive, 101 + this.opts.heartbeatIntervalMs ?? 10 * SECOND, 102 + ); 184 103 185 - ws.addEventListener("message", onMessage); 186 - ws.addEventListener( 187 - "close", 188 - () => { 189 - if (timer !== null) { 190 - clearInterval(timer); 191 - timer = null; 192 - } 193 - ws.removeEventListener("message", onMessage); 194 - }, 195 - { once: true }, 196 - ); 104 + ws.on("pong", () => { 105 + isAlive = true; 106 + }); 107 + ws.once("close", () => { 108 + if (heartbeatInterval) { 109 + clearInterval(heartbeatInterval); 110 + heartbeatInterval = null; 111 + } 112 + }); 197 113 } 198 114 } 199 115 ··· 204 120 } 205 121 206 122 function isReconnectable(err: unknown): boolean { 207 - // Network-ish errors are reconnectable. Keep your previous codes. 208 - if (!err || typeof err !== "object") return false; 209 - const e = err as { name?: unknown; code?: unknown }; 210 - if (typeof e.name !== "string") return false; 211 - return typeof e.code === "string" && networkErrorCodes.includes(e.code); 123 + if (isErrnoException(err) && typeof err.code === "string") { 124 + return networkErrorCodes.includes(err.code); 125 + } 126 + return false; 212 127 } 213 128 214 129 const networkErrorCodes = [ ··· 219 134 "EPIPE", 220 135 "ETIMEDOUT", 221 136 "ECANCELED", 222 - "ABORT_ERR", // surface our aborts as reconnectable if you want 223 137 ]; 224 138 225 139 function backoffMs(n: number, maxMs: number) { 226 - const baseSec = Math.pow(2, n); // 1, 2, 4, ... 227 - const randSec = Math.random() - 0.5; // jitter [-0.5, +0.5] 140 + const baseSec = Math.pow(2, n); 141 + const randSec = Math.random() - 0.5; 228 142 const ms = 1000 * (baseSec + randSec); 229 143 return Math.min(ms, maxMs); 230 144 } ··· 232 146 function forwardSignal(signal: AbortSignal, ac: AbortController) { 233 147 if (signal.aborted) { 234 148 return ac.abort(signal.reason); 149 + } else { 150 + signal.addEventListener("abort", () => ac.abort(signal.reason), { 151 + signal: ac.signal, 152 + }); 235 153 } 236 - const onAbort = () => ac.abort(signal.reason); 237 - // Use AbortSignal.any? Not universally available; just add/remove. 238 - signal.addEventListener("abort", onAbort, { signal: ac.signal }); 239 154 }