source dump of claude code
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 800 lines 28 kB view raw
1import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js' 2import type WsWebSocket from 'ws' 3import { logEvent } from '../../services/analytics/index.js' 4import { CircularBuffer } from '../../utils/CircularBuffer.js' 5import { logForDebugging } from '../../utils/debug.js' 6import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js' 7import { isEnvTruthy } from '../../utils/envUtils.js' 8import { getWebSocketTLSOptions } from '../../utils/mtls.js' 9import { 10 getWebSocketProxyAgent, 11 getWebSocketProxyUrl, 12} from '../../utils/proxy.js' 13import { 14 registerSessionActivityCallback, 15 unregisterSessionActivityCallback, 16} from '../../utils/sessionActivity.js' 17import { jsonStringify } from '../../utils/slowOperations.js' 18import type { Transport } from './Transport.js' 19 20const KEEP_ALIVE_FRAME = '{"type":"keep_alive"}\n' 21 22const DEFAULT_MAX_BUFFER_SIZE = 1000 23const DEFAULT_BASE_RECONNECT_DELAY = 1000 24const DEFAULT_MAX_RECONNECT_DELAY = 30000 25/** Time budget for reconnection attempts before giving up (10 minutes). */ 26const DEFAULT_RECONNECT_GIVE_UP_MS = 600_000 27const DEFAULT_PING_INTERVAL = 10000 28const DEFAULT_KEEPALIVE_INTERVAL = 300_000 // 5 minutes 29 30/** 31 * Threshold for detecting system sleep/wake. If the gap between consecutive 32 * reconnection attempts exceeds this, the machine likely slept. We reset 33 * the reconnection budget and retry — the server will reject with permanent 34 * close codes (4001/1002) if the session was reaped during sleep. 35 */ 36const SLEEP_DETECTION_THRESHOLD_MS = DEFAULT_MAX_RECONNECT_DELAY * 2 // 60s 37 38/** 39 * WebSocket close codes that indicate a permanent server-side rejection. 40 * The transport transitions to 'closed' immediately without retrying. 41 */ 42const PERMANENT_CLOSE_CODES = new Set([ 43 1002, // protocol error — server rejected handshake (e.g. session reaped) 44 4001, // session expired / not found 45 4003, // unauthorized 46]) 47 48export type WebSocketTransportOptions = { 49 /** When false, the transport does not attempt automatic reconnection on 50 * disconnect. Use this when the caller has its own recovery mechanism 51 * (e.g. the REPL bridge poll loop). Defaults to true. */ 52 autoReconnect?: boolean 53 /** Gates the tengu_ws_transport_* telemetry events. Set true at the 54 * REPL-bridge construction site so only Remote Control sessions (the 55 * Cloudflare-idle-timeout population) emit; print-mode workers stay 56 * silent. Defaults to false. */ 57 isBridge?: boolean 58} 59 60type WebSocketTransportState = 61 | 'idle' 62 | 'connected' 63 | 'reconnecting' 64 | 'closing' 65 | 'closed' 66 67// Common interface between globalThis.WebSocket and ws.WebSocket 68type WebSocketLike = { 69 close(): void 70 send(data: string): void 71 ping?(): void // Bun & ws both support this 72} 73 74export class WebSocketTransport implements Transport { 75 private ws: WebSocketLike | null = null 76 private lastSentId: string | null = null 77 protected url: URL 78 protected state: WebSocketTransportState = 'idle' 79 protected onData?: (data: string) => void 80 private onCloseCallback?: (closeCode?: number) => void 81 private onConnectCallback?: () => void 82 private headers: Record<string, string> 83 private sessionId?: string 84 private autoReconnect: boolean 85 private isBridge: boolean 86 87 // Reconnection state 88 private reconnectAttempts = 0 89 private reconnectStartTime: number | null = null 90 private reconnectTimer: NodeJS.Timeout | null = null 91 private lastReconnectAttemptTime: number | null = null 92 // Wall-clock of last WS data-frame activity (inbound message or outbound 93 // ws.send). Used to compute idle time at close — the signal for diagnosing 94 // proxy idle-timeout RSTs (e.g. Cloudflare 5-min). Excludes ping/pong 95 // control frames (proxies don't count those). 96 private lastActivityTime = 0 97 98 // Ping interval for connection health checks 99 private pingInterval: NodeJS.Timeout | null = null 100 private pongReceived = true 101 102 // Periodic keep_alive data frames to reset proxy idle timers 103 private keepAliveInterval: NodeJS.Timeout | null = null 104 105 // Message buffering for replay on reconnection 106 private messageBuffer: CircularBuffer<StdoutMessage> 107 // Track which runtime's WS we're using so we can detach listeners 108 // with the matching API (removeEventListener vs. off). 109 private isBunWs = false 110 111 // Captured at connect() time for handleOpenEvent timing. Stored as an 112 // instance field so the onOpen handler can be a stable class-property 113 // arrow function (removable in doDisconnect) instead of a closure over 114 // a local variable. 115 private connectStartTime = 0 116 117 private refreshHeaders?: () => Record<string, string> 118 119 constructor( 120 url: URL, 121 headers: Record<string, string> = {}, 122 sessionId?: string, 123 refreshHeaders?: () => Record<string, string>, 124 options?: WebSocketTransportOptions, 125 ) { 126 this.url = url 127 this.headers = headers 128 this.sessionId = sessionId 129 this.refreshHeaders = refreshHeaders 130 this.autoReconnect = options?.autoReconnect ?? true 131 this.isBridge = options?.isBridge ?? false 132 this.messageBuffer = new CircularBuffer(DEFAULT_MAX_BUFFER_SIZE) 133 } 134 135 public async connect(): Promise<void> { 136 if (this.state !== 'idle' && this.state !== 'reconnecting') { 137 logForDebugging( 138 `WebSocketTransport: Cannot connect, current state is ${this.state}`, 139 { level: 'error' }, 140 ) 141 logForDiagnosticsNoPII('error', 'cli_websocket_connect_failed') 142 return 143 } 144 this.state = 'reconnecting' 145 146 this.connectStartTime = Date.now() 147 logForDebugging(`WebSocketTransport: Opening ${this.url.href}`) 148 logForDiagnosticsNoPII('info', 'cli_websocket_connect_opening') 149 150 // Start with provided headers and add runtime headers 151 const headers = { ...this.headers } 152 if (this.lastSentId) { 153 headers['X-Last-Request-Id'] = this.lastSentId 154 logForDebugging( 155 `WebSocketTransport: Adding X-Last-Request-Id header: ${this.lastSentId}`, 156 ) 157 } 158 159 if (typeof Bun !== 'undefined') { 160 // Bun's WebSocket supports headers/proxy options but the DOM typings don't 161 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 162 const ws = new globalThis.WebSocket(this.url.href, { 163 headers, 164 proxy: getWebSocketProxyUrl(this.url.href), 165 tls: getWebSocketTLSOptions() || undefined, 166 } as unknown as string[]) 167 this.ws = ws 168 this.isBunWs = true 169 170 ws.addEventListener('open', this.onBunOpen) 171 ws.addEventListener('message', this.onBunMessage) 172 ws.addEventListener('error', this.onBunError) 173 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 174 ws.addEventListener('close', this.onBunClose) 175 // 'pong' is Bun-specific — not in DOM typings. 176 ws.addEventListener('pong', this.onPong) 177 } else { 178 const { default: WS } = await import('ws') 179 const ws = new WS(this.url.href, { 180 headers, 181 agent: getWebSocketProxyAgent(this.url.href), 182 ...getWebSocketTLSOptions(), 183 }) 184 this.ws = ws 185 this.isBunWs = false 186 187 ws.on('open', this.onNodeOpen) 188 ws.on('message', this.onNodeMessage) 189 ws.on('error', this.onNodeError) 190 ws.on('close', this.onNodeClose) 191 ws.on('pong', this.onPong) 192 } 193 } 194 195 // --- Bun (native WebSocket) event handlers --- 196 // Stored as class-property arrow functions so they can be removed in 197 // doDisconnect(). Without removal, each reconnect orphans the old WS 198 // object + its 5 closures until GC, which accumulates under network 199 // instability. Mirrors the pattern in src/utils/mcpWebSocketTransport.ts. 200 201 private onBunOpen = () => { 202 this.handleOpenEvent() 203 // Bun's WebSocket doesn't expose upgrade response headers, 204 // so replay all buffered messages. The server deduplicates by UUID. 205 if (this.lastSentId) { 206 this.replayBufferedMessages('') 207 } 208 } 209 210 private onBunMessage = (event: MessageEvent) => { 211 const message = 212 typeof event.data === 'string' ? event.data : String(event.data) 213 this.lastActivityTime = Date.now() 214 logForDiagnosticsNoPII('info', 'cli_websocket_message_received', { 215 length: message.length, 216 }) 217 if (this.onData) { 218 this.onData(message) 219 } 220 } 221 222 private onBunError = () => { 223 logForDebugging('WebSocketTransport: Error', { 224 level: 'error', 225 }) 226 logForDiagnosticsNoPII('error', 'cli_websocket_connect_error') 227 // close event fires after error — let it call handleConnectionError 228 } 229 230 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 231 private onBunClose = (event: CloseEvent) => { 232 const isClean = event.code === 1000 || event.code === 1001 233 logForDebugging( 234 `WebSocketTransport: Closed: ${event.code}`, 235 isClean ? undefined : { level: 'error' }, 236 ) 237 logForDiagnosticsNoPII('error', 'cli_websocket_connect_closed') 238 this.handleConnectionError(event.code) 239 } 240 241 // --- Node (ws package) event handlers --- 242 243 private onNodeOpen = () => { 244 // Capture ws before handleOpenEvent() invokes onConnectCallback — if the 245 // callback synchronously closes the transport, this.ws becomes null. 246 // The old inline-closure code had this safety implicitly via closure capture. 247 const ws = this.ws 248 this.handleOpenEvent() 249 if (!ws) return 250 // Check for last-id in upgrade response headers (ws package only) 251 const nws = ws as unknown as WsWebSocket & { 252 upgradeReq?: { headers?: Record<string, string> } 253 } 254 const upgradeResponse = nws.upgradeReq 255 if (upgradeResponse?.headers?.['x-last-request-id']) { 256 const serverLastId = upgradeResponse.headers['x-last-request-id'] 257 this.replayBufferedMessages(serverLastId) 258 } 259 } 260 261 private onNodeMessage = (data: Buffer) => { 262 const message = data.toString() 263 this.lastActivityTime = Date.now() 264 logForDiagnosticsNoPII('info', 'cli_websocket_message_received', { 265 length: message.length, 266 }) 267 if (this.onData) { 268 this.onData(message) 269 } 270 } 271 272 private onNodeError = (err: Error) => { 273 logForDebugging(`WebSocketTransport: Error: ${err.message}`, { 274 level: 'error', 275 }) 276 logForDiagnosticsNoPII('error', 'cli_websocket_connect_error') 277 // close event fires after error — let it call handleConnectionError 278 } 279 280 private onNodeClose = (code: number, _reason: Buffer) => { 281 const isClean = code === 1000 || code === 1001 282 logForDebugging( 283 `WebSocketTransport: Closed: ${code}`, 284 isClean ? undefined : { level: 'error' }, 285 ) 286 logForDiagnosticsNoPII('error', 'cli_websocket_connect_closed') 287 this.handleConnectionError(code) 288 } 289 290 // --- Shared handlers --- 291 292 private onPong = () => { 293 this.pongReceived = true 294 } 295 296 private handleOpenEvent(): void { 297 const connectDuration = Date.now() - this.connectStartTime 298 logForDebugging('WebSocketTransport: Connected') 299 logForDiagnosticsNoPII('info', 'cli_websocket_connect_connected', { 300 duration_ms: connectDuration, 301 }) 302 303 // Reconnect success — capture attempt count + downtime before resetting. 304 // reconnectStartTime is null on first connect, non-null on reopen. 305 if (this.isBridge && this.reconnectStartTime !== null) { 306 logEvent('tengu_ws_transport_reconnected', { 307 attempts: this.reconnectAttempts, 308 downtimeMs: Date.now() - this.reconnectStartTime, 309 }) 310 } 311 312 this.reconnectAttempts = 0 313 this.reconnectStartTime = null 314 this.lastReconnectAttemptTime = null 315 this.lastActivityTime = Date.now() 316 this.state = 'connected' 317 this.onConnectCallback?.() 318 319 // Start periodic pings to detect dead connections 320 this.startPingInterval() 321 322 // Start periodic keep_alive data frames to reset proxy idle timers 323 this.startKeepaliveInterval() 324 325 // Register callback for session activity signals 326 registerSessionActivityCallback(() => { 327 void this.write({ type: 'keep_alive' }) 328 }) 329 } 330 331 protected sendLine(line: string): boolean { 332 if (!this.ws || this.state !== 'connected') { 333 logForDebugging('WebSocketTransport: Not connected') 334 logForDiagnosticsNoPII('info', 'cli_websocket_send_not_connected') 335 return false 336 } 337 338 try { 339 this.ws.send(line) 340 this.lastActivityTime = Date.now() 341 return true 342 } catch (error) { 343 logForDebugging(`WebSocketTransport: Failed to send: ${error}`, { 344 level: 'error', 345 }) 346 logForDiagnosticsNoPII('error', 'cli_websocket_send_error') 347 // Don't null this.ws here — let doDisconnect() (via handleConnectionError) 348 // handle cleanup so listeners are removed before the WS is released. 349 this.handleConnectionError() 350 return false 351 } 352 } 353 354 /** 355 * Remove all listeners attached in connect() for the given WebSocket. 356 * Without this, each reconnect orphans the old WS object + its closures 357 * until GC — these accumulate under network instability. Mirrors the 358 * pattern in src/utils/mcpWebSocketTransport.ts. 359 */ 360 private removeWsListeners(ws: WebSocketLike): void { 361 if (this.isBunWs) { 362 const nws = ws as unknown as globalThis.WebSocket 363 nws.removeEventListener('open', this.onBunOpen) 364 nws.removeEventListener('message', this.onBunMessage) 365 nws.removeEventListener('error', this.onBunError) 366 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 367 nws.removeEventListener('close', this.onBunClose) 368 // 'pong' is Bun-specific — not in DOM typings 369 nws.removeEventListener('pong' as 'message', this.onPong) 370 } else { 371 const nws = ws as unknown as WsWebSocket 372 nws.off('open', this.onNodeOpen) 373 nws.off('message', this.onNodeMessage) 374 nws.off('error', this.onNodeError) 375 nws.off('close', this.onNodeClose) 376 nws.off('pong', this.onPong) 377 } 378 } 379 380 protected doDisconnect(): void { 381 // Stop pinging and keepalive when disconnecting 382 this.stopPingInterval() 383 this.stopKeepaliveInterval() 384 385 // Unregister session activity callback 386 unregisterSessionActivityCallback() 387 388 if (this.ws) { 389 // Remove listeners BEFORE close() so the old WS + closures can be 390 // GC'd promptly instead of lingering until the next mark-and-sweep. 391 this.removeWsListeners(this.ws) 392 this.ws.close() 393 this.ws = null 394 } 395 } 396 397 private handleConnectionError(closeCode?: number): void { 398 logForDebugging( 399 `WebSocketTransport: Disconnected from ${this.url.href}` + 400 (closeCode != null ? ` (code ${closeCode})` : ''), 401 ) 402 logForDiagnosticsNoPII('info', 'cli_websocket_disconnected') 403 if (this.isBridge) { 404 // Fire on every close — including intermediate ones during a reconnect 405 // storm (those never surface to the onCloseCallback consumer). For the 406 // Cloudflare-5min-idle hypothesis: cluster msSinceLastActivity; if the 407 // peak sits at ~300s with closeCode 1006, that's the proxy RST. 408 logEvent('tengu_ws_transport_closed', { 409 closeCode, 410 msSinceLastActivity: 411 this.lastActivityTime > 0 ? Date.now() - this.lastActivityTime : -1, 412 // 'connected' = healthy drop (the Cloudflare case); 'reconnecting' = 413 // connect-rejection mid-storm. State isn't mutated until the branches 414 // below, so this reads the pre-close value. 415 wasConnected: this.state === 'connected', 416 reconnectAttempts: this.reconnectAttempts, 417 }) 418 } 419 this.doDisconnect() 420 421 if (this.state === 'closing' || this.state === 'closed') return 422 423 // Permanent codes: don't retry — server has definitively ended the session. 424 // Exception: 4003 (unauthorized) can be retried when refreshHeaders is 425 // available and returns a new token (e.g. after the parent process mints 426 // a fresh session ingress token during reconnection). 427 let headersRefreshed = false 428 if (closeCode === 4003 && this.refreshHeaders) { 429 const freshHeaders = this.refreshHeaders() 430 if (freshHeaders.Authorization !== this.headers.Authorization) { 431 Object.assign(this.headers, freshHeaders) 432 headersRefreshed = true 433 logForDebugging( 434 'WebSocketTransport: 4003 received but headers refreshed, scheduling reconnect', 435 ) 436 logForDiagnosticsNoPII('info', 'cli_websocket_4003_token_refreshed') 437 } 438 } 439 440 if ( 441 closeCode != null && 442 PERMANENT_CLOSE_CODES.has(closeCode) && 443 !headersRefreshed 444 ) { 445 logForDebugging( 446 `WebSocketTransport: Permanent close code ${closeCode}, not reconnecting`, 447 { level: 'error' }, 448 ) 449 logForDiagnosticsNoPII('error', 'cli_websocket_permanent_close', { 450 closeCode, 451 }) 452 this.state = 'closed' 453 this.onCloseCallback?.(closeCode) 454 return 455 } 456 457 // When autoReconnect is disabled, go straight to closed state. 458 // The caller (e.g. REPL bridge poll loop) handles recovery. 459 if (!this.autoReconnect) { 460 this.state = 'closed' 461 this.onCloseCallback?.(closeCode) 462 return 463 } 464 465 // Schedule reconnection with exponential backoff and time budget 466 const now = Date.now() 467 if (!this.reconnectStartTime) { 468 this.reconnectStartTime = now 469 } 470 471 // Detect system sleep/wake: if the gap since our last reconnection 472 // attempt greatly exceeds the max delay, the machine likely slept 473 // (e.g. laptop lid closed). Reset the budget and retry from scratch — 474 // the server will reject with permanent close codes (4001/1002) if 475 // the session was reaped while we were asleep. 476 if ( 477 this.lastReconnectAttemptTime !== null && 478 now - this.lastReconnectAttemptTime > SLEEP_DETECTION_THRESHOLD_MS 479 ) { 480 logForDebugging( 481 `WebSocketTransport: Detected system sleep (${Math.round((now - this.lastReconnectAttemptTime) / 1000)}s gap), resetting reconnection budget`, 482 ) 483 logForDiagnosticsNoPII('info', 'cli_websocket_sleep_detected', { 484 gapMs: now - this.lastReconnectAttemptTime, 485 }) 486 this.reconnectStartTime = now 487 this.reconnectAttempts = 0 488 } 489 this.lastReconnectAttemptTime = now 490 491 const elapsed = now - this.reconnectStartTime 492 if (elapsed < DEFAULT_RECONNECT_GIVE_UP_MS) { 493 // Clear any existing reconnection timer to avoid duplicates 494 if (this.reconnectTimer) { 495 clearTimeout(this.reconnectTimer) 496 this.reconnectTimer = null 497 } 498 499 // Refresh headers before reconnecting (e.g. to pick up a new session token). 500 // Skip if already refreshed by the 4003 path above. 501 if (!headersRefreshed && this.refreshHeaders) { 502 const freshHeaders = this.refreshHeaders() 503 Object.assign(this.headers, freshHeaders) 504 logForDebugging('WebSocketTransport: Refreshed headers for reconnect') 505 } 506 507 this.state = 'reconnecting' 508 this.reconnectAttempts++ 509 510 const baseDelay = Math.min( 511 DEFAULT_BASE_RECONNECT_DELAY * Math.pow(2, this.reconnectAttempts - 1), 512 DEFAULT_MAX_RECONNECT_DELAY, 513 ) 514 // Add ±25% jitter to avoid thundering herd 515 const delay = Math.max( 516 0, 517 baseDelay + baseDelay * 0.25 * (2 * Math.random() - 1), 518 ) 519 520 logForDebugging( 521 `WebSocketTransport: Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}, ${Math.round(elapsed / 1000)}s elapsed)`, 522 ) 523 logForDiagnosticsNoPII('error', 'cli_websocket_reconnect_attempt', { 524 reconnectAttempts: this.reconnectAttempts, 525 }) 526 if (this.isBridge) { 527 logEvent('tengu_ws_transport_reconnecting', { 528 attempt: this.reconnectAttempts, 529 elapsedMs: elapsed, 530 delayMs: Math.round(delay), 531 }) 532 } 533 534 this.reconnectTimer = setTimeout(() => { 535 this.reconnectTimer = null 536 void this.connect() 537 }, delay) 538 } else { 539 logForDebugging( 540 `WebSocketTransport: Reconnection time budget exhausted after ${Math.round(elapsed / 1000)}s for ${this.url.href}`, 541 { level: 'error' }, 542 ) 543 logForDiagnosticsNoPII('error', 'cli_websocket_reconnect_exhausted', { 544 reconnectAttempts: this.reconnectAttempts, 545 elapsedMs: elapsed, 546 }) 547 this.state = 'closed' 548 549 // Notify close callback 550 if (this.onCloseCallback) { 551 this.onCloseCallback(closeCode) 552 } 553 } 554 } 555 556 close(): void { 557 // Clear any pending reconnection timer 558 if (this.reconnectTimer) { 559 clearTimeout(this.reconnectTimer) 560 this.reconnectTimer = null 561 } 562 563 // Clear ping and keepalive intervals 564 this.stopPingInterval() 565 this.stopKeepaliveInterval() 566 567 // Unregister session activity callback 568 unregisterSessionActivityCallback() 569 570 this.state = 'closing' 571 this.doDisconnect() 572 } 573 574 private replayBufferedMessages(lastId: string): void { 575 const messages = this.messageBuffer.toArray() 576 if (messages.length === 0) return 577 578 // Find where to start replay based on server's last received message 579 let startIndex = 0 580 if (lastId) { 581 const lastConfirmedIndex = messages.findIndex( 582 message => 'uuid' in message && message.uuid === lastId, 583 ) 584 if (lastConfirmedIndex >= 0) { 585 // Server confirmed messages up to lastConfirmedIndex — evict them 586 startIndex = lastConfirmedIndex + 1 587 // Rebuild the buffer with only unconfirmed messages 588 const remaining = messages.slice(startIndex) 589 this.messageBuffer.clear() 590 this.messageBuffer.addAll(remaining) 591 if (remaining.length === 0) { 592 this.lastSentId = null 593 } 594 logForDebugging( 595 `WebSocketTransport: Evicted ${startIndex} confirmed messages, ${remaining.length} remaining`, 596 ) 597 logForDiagnosticsNoPII( 598 'info', 599 'cli_websocket_evicted_confirmed_messages', 600 { 601 evicted: startIndex, 602 remaining: remaining.length, 603 }, 604 ) 605 } 606 } 607 608 const messagesToReplay = messages.slice(startIndex) 609 if (messagesToReplay.length === 0) { 610 logForDebugging('WebSocketTransport: No new messages to replay') 611 logForDiagnosticsNoPII('info', 'cli_websocket_no_messages_to_replay') 612 return 613 } 614 615 logForDebugging( 616 `WebSocketTransport: Replaying ${messagesToReplay.length} buffered messages`, 617 ) 618 logForDiagnosticsNoPII('info', 'cli_websocket_messages_to_replay', { 619 count: messagesToReplay.length, 620 }) 621 622 for (const message of messagesToReplay) { 623 const line = jsonStringify(message) + '\n' 624 const success = this.sendLine(line) 625 if (!success) { 626 this.handleConnectionError() 627 break 628 } 629 } 630 // Do NOT clear the buffer after replay — messages remain buffered until 631 // the server confirms receipt on the next reconnection. This prevents 632 // message loss if the connection drops after replay but before the server 633 // processes the messages. 634 } 635 636 isConnectedStatus(): boolean { 637 return this.state === 'connected' 638 } 639 640 isClosedStatus(): boolean { 641 return this.state === 'closed' 642 } 643 644 setOnData(callback: (data: string) => void): void { 645 this.onData = callback 646 } 647 648 setOnConnect(callback: () => void): void { 649 this.onConnectCallback = callback 650 } 651 652 setOnClose(callback: (closeCode?: number) => void): void { 653 this.onCloseCallback = callback 654 } 655 656 getStateLabel(): string { 657 return this.state 658 } 659 660 async write(message: StdoutMessage): Promise<void> { 661 if ('uuid' in message && typeof message.uuid === 'string') { 662 this.messageBuffer.add(message) 663 this.lastSentId = message.uuid 664 } 665 666 const line = jsonStringify(message) + '\n' 667 668 if (this.state !== 'connected') { 669 // Message buffered for replay when connected (if it has a UUID) 670 return 671 } 672 673 const sessionLabel = this.sessionId ? ` session=${this.sessionId}` : '' 674 const detailLabel = this.getControlMessageDetailLabel(message) 675 676 logForDebugging( 677 `WebSocketTransport: Sending message type=${message.type}${sessionLabel}${detailLabel}`, 678 ) 679 680 this.sendLine(line) 681 } 682 683 private getControlMessageDetailLabel(message: StdoutMessage): string { 684 if (message.type === 'control_request') { 685 const { request_id, request } = message 686 const toolName = 687 request.subtype === 'can_use_tool' ? request.tool_name : '' 688 return ` subtype=${request.subtype} request_id=${request_id}${toolName ? ` tool=${toolName}` : ''}` 689 } 690 if (message.type === 'control_response') { 691 const { subtype, request_id } = message.response 692 return ` subtype=${subtype} request_id=${request_id}` 693 } 694 return '' 695 } 696 697 private startPingInterval(): void { 698 // Clear any existing interval 699 this.stopPingInterval() 700 701 this.pongReceived = true 702 let lastTickTime = Date.now() 703 704 // Send ping periodically to detect dead connections. 705 // If the previous ping got no pong, treat the connection as dead. 706 this.pingInterval = setInterval(() => { 707 if (this.state === 'connected' && this.ws) { 708 const now = Date.now() 709 const gap = now - lastTickTime 710 lastTickTime = now 711 712 // Process-suspension detector. If the wall-clock gap between ticks 713 // greatly exceeds the 10s interval, the process was suspended 714 // (laptop lid, SIGSTOP, VM pause). setInterval does not queue 715 // missed ticks — it coalesces — so on wake this callback fires 716 // once with a huge gap. The socket is almost certainly dead: 717 // NAT mappings drop in 30s–5min, and the server has been 718 // retransmitting into the void. Don't wait for a ping/pong 719 // round-trip to confirm (ws.ping() on a dead socket returns 720 // immediately with no error — bytes go into the kernel send 721 // buffer). Assume dead and reconnect now. A spurious reconnect 722 // after a short sleep is cheap — replayBufferedMessages() handles 723 // it and the server dedups by UUID. 724 if (gap > SLEEP_DETECTION_THRESHOLD_MS) { 725 logForDebugging( 726 `WebSocketTransport: ${Math.round(gap / 1000)}s tick gap detected — process was suspended, forcing reconnect`, 727 ) 728 logForDiagnosticsNoPII( 729 'info', 730 'cli_websocket_sleep_detected_on_ping', 731 { gapMs: gap }, 732 ) 733 this.handleConnectionError() 734 return 735 } 736 737 if (!this.pongReceived) { 738 logForDebugging( 739 'WebSocketTransport: No pong received, connection appears dead', 740 { level: 'error' }, 741 ) 742 logForDiagnosticsNoPII('error', 'cli_websocket_pong_timeout') 743 this.handleConnectionError() 744 return 745 } 746 747 this.pongReceived = false 748 try { 749 this.ws.ping?.() 750 } catch (error) { 751 logForDebugging(`WebSocketTransport: Ping failed: ${error}`, { 752 level: 'error', 753 }) 754 logForDiagnosticsNoPII('error', 'cli_websocket_ping_failed') 755 } 756 } 757 }, DEFAULT_PING_INTERVAL) 758 } 759 760 private stopPingInterval(): void { 761 if (this.pingInterval) { 762 clearInterval(this.pingInterval) 763 this.pingInterval = null 764 } 765 } 766 767 private startKeepaliveInterval(): void { 768 this.stopKeepaliveInterval() 769 770 // In CCR sessions, session activity heartbeats handle keep-alives 771 if (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE)) { 772 return 773 } 774 775 this.keepAliveInterval = setInterval(() => { 776 if (this.state === 'connected' && this.ws) { 777 try { 778 this.ws.send(KEEP_ALIVE_FRAME) 779 this.lastActivityTime = Date.now() 780 logForDebugging( 781 'WebSocketTransport: Sent periodic keep_alive data frame', 782 ) 783 } catch (error) { 784 logForDebugging( 785 `WebSocketTransport: Periodic keep_alive failed: ${error}`, 786 { level: 'error' }, 787 ) 788 logForDiagnosticsNoPII('error', 'cli_websocket_keepalive_failed') 789 } 790 } 791 }, DEFAULT_KEEPALIVE_INTERVAL) 792 } 793 794 private stopKeepaliveInterval(): void { 795 if (this.keepAliveInterval) { 796 clearInterval(this.keepAliveInterval) 797 this.keepAliveInterval = null 798 } 799 } 800}