A generic websocket connection with Zod schema validation and on message execution.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: correct reconnect attempts, add exhausted event, merge param methods #1

open opened by eric.wien targeting main from eric.wien/wah: fix/reconnect-router-params

Fixes 4 issues surfaced in a design review of the codebase.

Fixes#

  • Reconnect off-by-onescheduleReconnect checked >= maxAttempts after incrementing, so maxAttempts: N produced only N-1 real reconnects. Now checks before incrementing; maxAttempts: 3 yields 3 attempts per service, matching the documented contract.
  • New exhausted event — emitted when reconnection gives up after all attempts/cycles ({ services, attempts, cycles }); previously the give-up branch only logged, so consumers had no signal. Wired through WebSocketClient, documented in README + JSDoc.
  • Router throughput (demo)test-jetstream.ts rewritten to a single z.discriminatedUnion + one handler (O(1) dispatch on kind) instead of registering 3 overlapping schemas that ran a full safeParse per schema per message. The router code is unchanged; the canonical example now is also the fastest pattern.
  • Param method mergesetParams folded into updateParams(params, { immediate }). setParams kept as a deprecated alias for updateParams(params, { immediate: false }).

Tests#

Added regression guards in tests/client.test.ts: off-by-one attempt count (3 reconnecting + 1 exhausted), deferred params, setParams alias. Full suite: 55 passing. Typecheck, lint, build all green.

Behavior changes (review carefully)#

  • maxAttempts now does one more attempt per service than before — anyone relying on the old N-1 behavior gets N.
  • exhausted is a new public event on both the connection and client.
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:4cvte3gr2l65lukolfy5rgma/sh.tangled.repo.pull/3mnk52z35bo22
+163 -42
Diff #0
+7 -2
README.md
··· 138 138 139 139 Sends data through the WebSocket. Objects are JSON-serialized. Returns `true` if sent. 140 140 141 - **`updateParams(params: Record<string, string | number | boolean>): void`** 141 + **`updateParams(params, options?: { immediate?: boolean }): void`** 142 142 143 - Merges new query parameters and reconnects. 143 + Merges new query parameters. By default (`immediate: true`) the connection reconnects so the new URL takes effect now. Pass `{ immediate: false }` to defer the change to the next reconnection without tearing down a healthy socket — useful for updating a resume cursor inside a `"close"` handler. 144 + 145 + **`setParams(params): void`** — _deprecated_ 146 + 147 + Alias for `updateParams(params, { immediate: false })`. 144 148 145 149 **`getConnectionInfo(): ConnectionInfo`** 146 150 ··· 155 159 | `"error"` | `Error \| HandlerError` | Connection error or handler error. | 156 160 | `"reconnecting"` | `{ attempt, maxAttempts, delay, service }` | About to reconnect. | 157 161 | `"serviceSwitched"` | `{ from, to, cycle }` | Failed over to a different service URL. | 162 + | `"exhausted"` | `{ services, attempts, cycles }` | Gave up reconnecting after all attempts and service cycles. | 158 163 159 164 ### `HandlerContext<T>` 160 165
+18 -13
src/WebSocketClient.ts
··· 114 114 } 115 115 116 116 /** 117 - * Merges new query parameters into the connection URL and reconnects. 117 + * Merges new query parameters into the connection URL. 118 118 * Existing parameters not present in the update are preserved. 119 119 * 120 + * By default the connection reconnects immediately so the new URL takes 121 + * effect. Pass `{ immediate: false }` to defer the change to the next 122 + * reconnection instead of tearing down a healthy socket. 123 + * 120 124 * @param params - Key-value pairs to merge into the current query parameters. 125 + * @param options - `immediate` (default `true`): reconnect now vs. on next reconnect. 121 126 * 122 127 * @example 123 128 * ```typescript 124 129 * // Initial connection to wss://example.com?channel=lobby 125 130 * client.updateParams({ channel: "game-1" }); 126 131 * // Reconnects to wss://example.com?channel=game-1 132 + * 133 + * // Update cursor on disconnect so reconnection resumes from last position 134 + * client.on("close", () => { 135 + * client.updateParams({ cursor: lastEventId }, { immediate: false }); 136 + * }); 127 137 * ``` 128 138 */ 129 - updateParams(params: Record<string, string | number | boolean>): void { 130 - this.connection.updateParams(params); 139 + updateParams( 140 + params: Record<string, string | number | boolean>, 141 + options: { immediate?: boolean } = {} 142 + ): void { 143 + this.connection.updateParams(params, options); 131 144 } 132 145 133 146 /** 134 147 * Merges new query parameters without triggering a reconnection. 135 - * The updated params take effect on the next connection attempt 136 - * (e.g., when the built-in reconnection fires after a disconnect). 137 148 * 149 + * @deprecated Use `updateParams(params, { immediate: false })`. Kept as an alias. 138 150 * @param params - Key-value pairs to merge into the current query parameters. 139 - * 140 - * @example 141 - * ```typescript 142 - * // Update cursor on disconnect so reconnection resumes from last position 143 - * client.on("close", () => { 144 - * client.setParams({ cursor: lastEventId }); 145 - * }); 146 - * ``` 147 151 */ 148 152 setParams(params: Record<string, string | number | boolean>): void { 149 153 this.connection.setParams(params); ··· 164 168 ); 165 169 this.connection.on("reconnecting", (info: unknown) => this.emit("reconnecting", info)); 166 170 this.connection.on("serviceSwitched", (info: unknown) => this.emit("serviceSwitched", info)); 171 + this.connection.on("exhausted", (info: unknown) => this.emit("exhausted", info)); 167 172 168 173 // Connection errors → unified "error" event 169 174 this.connection.on("error", (error: Error) => this.emit("error", error));
+31 -9
src/connection/WebSocketConnection.ts
··· 16 16 * - `"message"` — message received (emits the raw message data) 17 17 * - `"reconnecting"` — about to attempt reconnection (emits `{ attempt, maxAttempts, delay, service }`) 18 18 * - `"serviceSwitched"` — failed over to a different service URL (emits `{ from, to, cycle }`) 19 + * - `"exhausted"` — gave up reconnecting after all attempts/cycles (emits `{ services, attempts, cycles }`) 19 20 */ 20 21 export class WebSocketConnection extends Emitter { 21 22 private services: string[]; ··· 104 105 } 105 106 106 107 /** 107 - * Merges new query parameters into the current set and reconnects 108 - * so the updated URL takes effect. 108 + * Merges new query parameters into the current set. 109 + * 110 + * When `immediate` is `true` (the default), the current connection is closed 111 + * and reopened so the updated URL takes effect now. When `false`, the params 112 + * are stored and take effect on the next connection attempt (e.g. after a 113 + * disconnect triggers the built-in reconnection) without tearing down a 114 + * healthy socket. 109 115 */ 110 - updateParams(params: Record<string, string | number | boolean>): void { 116 + updateParams( 117 + params: Record<string, string | number | boolean>, 118 + options: { immediate?: boolean } = {} 119 + ): void { 111 120 this.queryParams = { ...this.queryParams, ...params }; 121 + 122 + if (options.immediate === false) { 123 + this.logger.debug("Query params set (deferred to next reconnect)", { 124 + params: this.queryParams, 125 + }); 126 + return; 127 + } 128 + 112 129 this.logger.info("Query params updated, reconnecting", { params: this.queryParams }); 113 130 114 131 // Graceful reconnect: close current, then re-run ··· 120 137 121 138 /** 122 139 * Merges new query parameters without triggering a reconnection. 123 - * The updated params will take effect on the next connection attempt 124 - * (e.g., after a disconnect triggers the built-in reconnection). 140 + * 141 + * @deprecated Use `updateParams(params, { immediate: false })`. Kept as an alias. 125 142 */ 126 143 setParams(params: Record<string, string | number | boolean>): void { 127 - this.queryParams = { ...this.queryParams, ...params }; 128 - this.logger.debug("Query params set (no reconnect)", { params: this.queryParams }); 144 + this.updateParams(params, { immediate: false }); 129 145 } 130 146 131 147 /** ··· 245 261 } 246 262 247 263 private scheduleReconnect(): void { 248 - this.reconnectAttempts++; 249 - 264 + // Have we already used all attempts for the current service? 250 265 if (this.reconnectAttempts >= this.reconnectConfig.maxAttempts) { 251 266 if (this.canTryNextService()) { 252 267 this.moveToNextService(); ··· 257 272 maxCycles: this.reconnectConfig.maxServiceCycles, 258 273 completedCycles: this.serviceCycles, 259 274 }); 275 + this.emit("exhausted", { 276 + services: [...this.services], 277 + attempts: this.reconnectAttempts, 278 + cycles: this.serviceCycles, 279 + }); 260 280 return; 261 281 } 262 282 283 + this.reconnectAttempts++; 284 + 263 285 const delay = Math.min( 264 286 this.reconnectConfig.initialDelay * 265 287 Math.pow(this.reconnectConfig.backoffFactor, this.reconnectAttempts - 1),
+40 -18
test-jetstream.ts
··· 3 3 4 4 // --------------------------------------------------------------------------- 5 5 // Jetstream message schemas 6 + // 7 + // Each event is keyed on a `kind` literal ("commit" | "identity" | "account"). 8 + // Instead of registering three separate schemas — which makes the router run a 9 + // full Zod validation per schema per message (O(handlers) on a firehose) — we 10 + // combine them into a single `z.discriminatedUnion`. Zod dispatches on `kind` 11 + // in O(1), and the whole stream is handled by one `handle()` registration. 6 12 // --------------------------------------------------------------------------- 7 13 8 14 const commitCreateSchema = z.object({ ··· 54 60 }), 55 61 }); 56 62 63 + // One union, dispatched on `kind` — the fast path for high-throughput streams. 64 + const eventSchema = z.discriminatedUnion("kind", [ 65 + commitCreateSchema, 66 + identitySchema, 67 + accountSchema, 68 + ]); 69 + 57 70 // --------------------------------------------------------------------------- 58 71 // Counters for summary 59 72 // --------------------------------------------------------------------------- ··· 87 100 }); 88 101 89 102 // --------------------------------------------------------------------------- 90 - // Register handlers 103 + // Register a single handler for the whole stream — `data.kind` narrows the type 91 104 // --------------------------------------------------------------------------- 92 105 93 - client.handle(commitCreateSchema, ({ data }) => { 94 - postCount++; 95 - const text = data.commit.record.text.slice(0, 80).replace(/\n/g, " "); 96 - const langs = data.commit.record.langs?.join(",") ?? "?"; 97 - const isReply = data.commit.record.reply ? " [reply]" : ""; 98 - console.log(`[POST #${postCount}] (${langs})${isReply} ${text}`); 99 - }); 100 - 101 - client.handle(identitySchema, ({ data }) => { 102 - identityCount++; 103 - console.log(`[IDENTITY] ${data.identity.handle} (${data.identity.did})`); 104 - }); 105 - 106 - client.handle(accountSchema, ({ data }) => { 107 - accountCount++; 108 - const status = data.account.active ? "active" : "inactive"; 109 - console.log(`[ACCOUNT] ${data.account.did} → ${status}`); 106 + client.handle(eventSchema, ({ data }) => { 107 + switch (data.kind) { 108 + case "commit": { 109 + postCount++; 110 + const text = data.commit.record.text.slice(0, 80).replace(/\n/g, " "); 111 + const langs = data.commit.record.langs?.join(",") ?? "?"; 112 + const isReply = data.commit.record.reply ? " [reply]" : ""; 113 + console.log(`[POST #${postCount}] (${langs})${isReply} ${text}`); 114 + break; 115 + } 116 + case "identity": { 117 + identityCount++; 118 + console.log(`[IDENTITY] ${data.identity.handle} (${data.identity.did})`); 119 + break; 120 + } 121 + case "account": { 122 + accountCount++; 123 + const status = data.account.active ? "active" : "inactive"; 124 + console.log(`[ACCOUNT] ${data.account.did} → ${status}`); 125 + break; 126 + } 127 + } 110 128 }); 111 129 112 130 // --------------------------------------------------------------------------- ··· 133 151 console.log("[SERVICE SWITCHED]", info); 134 152 }); 135 153 154 + client.on("exhausted", (info) => { 155 + console.error("[EXHAUSTED] gave up reconnecting", info); 156 + }); 157 + 136 158 // --------------------------------------------------------------------------- 137 159 // Connect and auto-close after 15 seconds 138 160 // ---------------------------------------------------------------------------
+67
tests/client.test.ts
··· 222 222 ); 223 223 }); 224 224 }); 225 + 226 + it("reconnects exactly maxAttempts times, then emits exhausted", () => { 227 + vi.useFakeTimers(); 228 + const client = createClient({ 229 + reconnect: { maxAttempts: 3, initialDelay: 10, backoffFactor: 1, maxServiceCycles: 2 }, 230 + }); 231 + const onReconnecting = vi.fn(); 232 + const onExhausted = vi.fn(); 233 + client.on("reconnecting", onReconnecting); 234 + client.on("exhausted", onExhausted); 235 + 236 + client.connect(); 237 + let ws = lastWs(); 238 + ws.readyState = WS_READY_STATE.OPEN; 239 + ws.onopen!({}); 240 + 241 + const drop = () => { 242 + ws.readyState = WS_READY_STATE.CLOSED; 243 + ws.onclose!({ code: 1006, reason: "" }); 244 + }; 245 + 246 + drop(); // schedules reconnect attempt 1 247 + for (let i = 0; i < 3; i++) { 248 + vi.advanceTimersByTime(10); // fire the reconnect timer → new socket 249 + ws = lastWs(); 250 + drop(); // and fail that attempt 251 + } 252 + 253 + // maxAttempts: 3 must yield 3 real reconnect attempts (not 2 — regression guard) 254 + expect(onReconnecting).toHaveBeenCalledTimes(3); 255 + expect(onExhausted).toHaveBeenCalledTimes(1); 256 + expect(onExhausted).toHaveBeenCalledWith(expect.objectContaining({ attempts: 3 })); 257 + 258 + vi.useRealTimers(); 259 + }); 260 + 261 + it("updateParams({ immediate: false }) defers without reconnecting", () => { 262 + const client = createClient(); 263 + client.connect(); 264 + const ws = lastWs(); 265 + ws.readyState = WS_READY_STATE.OPEN; 266 + ws.onopen!({}); 267 + const countBefore = mockWsInstances.length; 268 + 269 + client.updateParams({ cursor: "123" }, { immediate: false }); 270 + 271 + expect(mockWsInstances.length).toBe(countBefore); // no new socket 272 + expect(ws.close).not.toHaveBeenCalled(); 273 + expect(client.getConnectionInfo().currentService).toContain("cursor=123"); 274 + 275 + client.updateParams({ cursor: "456" }); // default immediate: true reconnects 276 + expect(mockWsInstances.length).toBe(countBefore + 1); 277 + }); 278 + 279 + it("setParams is an alias for updateParams({ immediate: false })", () => { 280 + const client = createClient(); 281 + client.connect(); 282 + const ws = lastWs(); 283 + ws.readyState = WS_READY_STATE.OPEN; 284 + ws.onopen!({}); 285 + const countBefore = mockWsInstances.length; 286 + 287 + client.setParams({ cursor: "abc" }); 288 + 289 + expect(mockWsInstances.length).toBe(countBefore); // no reconnect 290 + expect(client.getConnectionInfo().currentService).toContain("cursor=abc"); 291 + }); 225 292 });

History

1 round 0 comments
sign up or login to add to the discussion
eric.wien submitted #0
1 commit
expand
fix: correct reconnect attempts, add exhausted event, merge param methods
merge conflicts detected
expand
  • README.md:138
  • src/WebSocketClient.ts:114
  • src/connection/WebSocketConnection.ts:16
  • test-jetstream.ts:3
  • tests/client.test.ts:222
expand 0 comments