source dump of claude code
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 711 lines 24 kB view raw
1import axios, { type AxiosError } from 'axios' 2import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js' 3import { logForDebugging } from '../../utils/debug.js' 4import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js' 5import { errorMessage } from '../../utils/errors.js' 6import { getSessionIngressAuthHeaders } from '../../utils/sessionIngressAuth.js' 7import { sleep } from '../../utils/sleep.js' 8import { jsonParse, jsonStringify } from '../../utils/slowOperations.js' 9import { getClaudeCodeUserAgent } from '../../utils/userAgent.js' 10import type { Transport } from './Transport.js' 11 12// --------------------------------------------------------------------------- 13// Configuration 14// --------------------------------------------------------------------------- 15 16const RECONNECT_BASE_DELAY_MS = 1000 17const RECONNECT_MAX_DELAY_MS = 30_000 18/** Time budget for reconnection attempts before giving up (10 minutes). */ 19const RECONNECT_GIVE_UP_MS = 600_000 20/** Server sends keepalives every 15s; treat connection as dead after 45s of silence. */ 21const LIVENESS_TIMEOUT_MS = 45_000 22 23/** 24 * HTTP status codes that indicate a permanent server-side rejection. 25 * The transport transitions to 'closed' immediately without retrying. 26 */ 27const PERMANENT_HTTP_CODES = new Set([401, 403, 404]) 28 29// POST retry configuration (matches HybridTransport) 30const POST_MAX_RETRIES = 10 31const POST_BASE_DELAY_MS = 500 32const POST_MAX_DELAY_MS = 8000 33 34/** Hoisted TextDecoder options to avoid per-chunk allocation in readStream. */ 35const STREAM_DECODE_OPTS: TextDecodeOptions = { stream: true } 36 37/** Hoisted axios validateStatus callback to avoid per-request closure allocation. */ 38function alwaysValidStatus(): boolean { 39 return true 40} 41 42// --------------------------------------------------------------------------- 43// SSE Frame Parser 44// --------------------------------------------------------------------------- 45 46type SSEFrame = { 47 event?: string 48 id?: string 49 data?: string 50} 51 52/** 53 * Incrementally parse SSE frames from a text buffer. 54 * Returns parsed frames and the remaining (incomplete) buffer. 55 * 56 * @internal exported for testing 57 */ 58export function parseSSEFrames(buffer: string): { 59 frames: SSEFrame[] 60 remaining: string 61} { 62 const frames: SSEFrame[] = [] 63 let pos = 0 64 65 // SSE frames are delimited by double newlines 66 let idx: number 67 while ((idx = buffer.indexOf('\n\n', pos)) !== -1) { 68 const rawFrame = buffer.slice(pos, idx) 69 pos = idx + 2 70 71 // Skip empty frames 72 if (!rawFrame.trim()) continue 73 74 const frame: SSEFrame = {} 75 let isComment = false 76 77 for (const line of rawFrame.split('\n')) { 78 if (line.startsWith(':')) { 79 // SSE comment (e.g., `:keepalive`) 80 isComment = true 81 continue 82 } 83 84 const colonIdx = line.indexOf(':') 85 if (colonIdx === -1) continue 86 87 const field = line.slice(0, colonIdx) 88 // Per SSE spec, strip one leading space after colon if present 89 const value = 90 line[colonIdx + 1] === ' ' 91 ? line.slice(colonIdx + 2) 92 : line.slice(colonIdx + 1) 93 94 switch (field) { 95 case 'event': 96 frame.event = value 97 break 98 case 'id': 99 frame.id = value 100 break 101 case 'data': 102 // Per SSE spec, multiple data: lines are concatenated with \n 103 frame.data = frame.data ? frame.data + '\n' + value : value 104 break 105 // Ignore other fields (retry:, etc.) 106 } 107 } 108 109 // Only emit frames that have data (or are pure comments which reset liveness) 110 if (frame.data || isComment) { 111 frames.push(frame) 112 } 113 } 114 115 return { frames, remaining: buffer.slice(pos) } 116} 117 118// --------------------------------------------------------------------------- 119// Types 120// --------------------------------------------------------------------------- 121 122type SSETransportState = 123 | 'idle' 124 | 'connected' 125 | 'reconnecting' 126 | 'closing' 127 | 'closed' 128 129/** 130 * Payload for `event: client_event` frames, matching the StreamClientEvent 131 * proto message in session_stream.proto. This is the only event type sent 132 * to worker subscribers — delivery_update, session_update, ephemeral_event, 133 * and catch_up_truncated are client-channel-only (see notifier.go and 134 * event_stream.go SubscriberClient guard). 135 */ 136export type StreamClientEvent = { 137 event_id: string 138 sequence_num: number 139 event_type: string 140 source: string 141 payload: Record<string, unknown> 142 created_at: string 143} 144 145// --------------------------------------------------------------------------- 146// SSETransport 147// --------------------------------------------------------------------------- 148 149/** 150 * Transport that uses SSE for reading and HTTP POST for writing. 151 * 152 * Reads events via Server-Sent Events from the CCR v2 event stream endpoint. 153 * Writes events via HTTP POST with retry logic (same pattern as HybridTransport). 154 * 155 * Each `event: client_event` frame carries a StreamClientEvent proto JSON 156 * directly in `data:`. The transport extracts `payload` and passes it to 157 * `onData` as newline-delimited JSON for StructuredIO consumers. 158 * 159 * Supports automatic reconnection with exponential backoff and Last-Event-ID 160 * for resumption after disconnection. 161 */ 162export class SSETransport implements Transport { 163 private state: SSETransportState = 'idle' 164 private onData?: (data: string) => void 165 private onCloseCallback?: (closeCode?: number) => void 166 private onEventCallback?: (event: StreamClientEvent) => void 167 private headers: Record<string, string> 168 private sessionId?: string 169 private refreshHeaders?: () => Record<string, string> 170 private readonly getAuthHeaders: () => Record<string, string> 171 172 // SSE connection state 173 private abortController: AbortController | null = null 174 private lastSequenceNum = 0 175 private seenSequenceNums = new Set<number>() 176 177 // Reconnection state 178 private reconnectAttempts = 0 179 private reconnectStartTime: number | null = null 180 private reconnectTimer: NodeJS.Timeout | null = null 181 182 // Liveness detection 183 private livenessTimer: NodeJS.Timeout | null = null 184 185 // POST URL (derived from SSE URL) 186 private postUrl: string 187 188 // Runtime epoch for CCR v2 event format 189 190 constructor( 191 private readonly url: URL, 192 headers: Record<string, string> = {}, 193 sessionId?: string, 194 refreshHeaders?: () => Record<string, string>, 195 initialSequenceNum?: number, 196 /** 197 * Per-instance auth header source. Omit to read the process-wide 198 * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers). Required 199 * for concurrent multi-session callers — the env-var path is a process 200 * global and would stomp across sessions. 201 */ 202 getAuthHeaders?: () => Record<string, string>, 203 ) { 204 this.headers = headers 205 this.sessionId = sessionId 206 this.refreshHeaders = refreshHeaders 207 this.getAuthHeaders = getAuthHeaders ?? getSessionIngressAuthHeaders 208 this.postUrl = convertSSEUrlToPostUrl(url) 209 // Seed with a caller-provided high-water mark so the first connect() 210 // sends from_sequence_num / Last-Event-ID. Without this, a fresh 211 // SSETransport always asks the server to replay from sequence 0 — 212 // the entire session history on every transport swap. 213 if (initialSequenceNum !== undefined && initialSequenceNum > 0) { 214 this.lastSequenceNum = initialSequenceNum 215 } 216 logForDebugging(`SSETransport: SSE URL = ${url.href}`) 217 logForDebugging(`SSETransport: POST URL = ${this.postUrl}`) 218 logForDiagnosticsNoPII('info', 'cli_sse_transport_initialized') 219 } 220 221 /** 222 * High-water mark of sequence numbers seen on this stream. Callers that 223 * recreate the transport (e.g. replBridge onWorkReceived) read this before 224 * close() and pass it as `initialSequenceNum` to the next instance so the 225 * server resumes from the right point instead of replaying everything. 226 */ 227 getLastSequenceNum(): number { 228 return this.lastSequenceNum 229 } 230 231 async connect(): Promise<void> { 232 if (this.state !== 'idle' && this.state !== 'reconnecting') { 233 logForDebugging( 234 `SSETransport: Cannot connect, current state is ${this.state}`, 235 { level: 'error' }, 236 ) 237 logForDiagnosticsNoPII('error', 'cli_sse_connect_failed') 238 return 239 } 240 241 this.state = 'reconnecting' 242 const connectStartTime = Date.now() 243 244 // Build SSE URL with sequence number for resumption 245 const sseUrl = new URL(this.url.href) 246 if (this.lastSequenceNum > 0) { 247 sseUrl.searchParams.set('from_sequence_num', String(this.lastSequenceNum)) 248 } 249 250 // Build headers -- use fresh auth headers (supports Cookie for session keys). 251 // Remove stale Authorization header from this.headers when Cookie auth is used, 252 // since sending both confuses the auth interceptor. 253 const authHeaders = this.getAuthHeaders() 254 const headers: Record<string, string> = { 255 ...this.headers, 256 ...authHeaders, 257 Accept: 'text/event-stream', 258 'anthropic-version': '2023-06-01', 259 'User-Agent': getClaudeCodeUserAgent(), 260 } 261 if (authHeaders['Cookie']) { 262 delete headers['Authorization'] 263 } 264 if (this.lastSequenceNum > 0) { 265 headers['Last-Event-ID'] = String(this.lastSequenceNum) 266 } 267 268 logForDebugging(`SSETransport: Opening ${sseUrl.href}`) 269 logForDiagnosticsNoPII('info', 'cli_sse_connect_opening') 270 271 this.abortController = new AbortController() 272 273 try { 274 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 275 const response = await fetch(sseUrl.href, { 276 headers, 277 signal: this.abortController.signal, 278 }) 279 280 if (!response.ok) { 281 const isPermanent = PERMANENT_HTTP_CODES.has(response.status) 282 logForDebugging( 283 `SSETransport: HTTP ${response.status}${isPermanent ? ' (permanent)' : ''}`, 284 { level: 'error' }, 285 ) 286 logForDiagnosticsNoPII('error', 'cli_sse_connect_http_error', { 287 status: response.status, 288 }) 289 290 if (isPermanent) { 291 this.state = 'closed' 292 this.onCloseCallback?.(response.status) 293 return 294 } 295 296 this.handleConnectionError() 297 return 298 } 299 300 if (!response.body) { 301 logForDebugging('SSETransport: No response body') 302 this.handleConnectionError() 303 return 304 } 305 306 // Successfully connected 307 const connectDuration = Date.now() - connectStartTime 308 logForDebugging('SSETransport: Connected') 309 logForDiagnosticsNoPII('info', 'cli_sse_connect_connected', { 310 duration_ms: connectDuration, 311 }) 312 313 this.state = 'connected' 314 this.reconnectAttempts = 0 315 this.reconnectStartTime = null 316 this.resetLivenessTimer() 317 318 // Read the SSE stream 319 await this.readStream(response.body) 320 } catch (error) { 321 if (this.abortController?.signal.aborted) { 322 // Intentional close 323 return 324 } 325 326 logForDebugging( 327 `SSETransport: Connection error: ${errorMessage(error)}`, 328 { level: 'error' }, 329 ) 330 logForDiagnosticsNoPII('error', 'cli_sse_connect_error') 331 this.handleConnectionError() 332 } 333 } 334 335 /** 336 * Read and process the SSE stream body. 337 */ 338 // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins 339 private async readStream(body: ReadableStream<Uint8Array>): Promise<void> { 340 const reader = body.getReader() 341 const decoder = new TextDecoder() 342 let buffer = '' 343 344 try { 345 while (true) { 346 const { done, value } = await reader.read() 347 if (done) break 348 349 buffer += decoder.decode(value, STREAM_DECODE_OPTS) 350 const { frames, remaining } = parseSSEFrames(buffer) 351 buffer = remaining 352 353 for (const frame of frames) { 354 // Any frame (including keepalive comments) proves the connection is alive 355 this.resetLivenessTimer() 356 357 if (frame.id) { 358 const seqNum = parseInt(frame.id, 10) 359 if (!isNaN(seqNum)) { 360 if (this.seenSequenceNums.has(seqNum)) { 361 logForDebugging( 362 `SSETransport: DUPLICATE frame seq=${seqNum} (lastSequenceNum=${this.lastSequenceNum}, seenCount=${this.seenSequenceNums.size})`, 363 { level: 'warn' }, 364 ) 365 logForDiagnosticsNoPII('warn', 'cli_sse_duplicate_sequence') 366 } else { 367 this.seenSequenceNums.add(seqNum) 368 // Prevent unbounded growth: once we have many entries, prune 369 // old sequence numbers that are well below the high-water mark. 370 // Only sequence numbers near lastSequenceNum matter for dedup. 371 if (this.seenSequenceNums.size > 1000) { 372 const threshold = this.lastSequenceNum - 200 373 for (const s of this.seenSequenceNums) { 374 if (s < threshold) { 375 this.seenSequenceNums.delete(s) 376 } 377 } 378 } 379 } 380 if (seqNum > this.lastSequenceNum) { 381 this.lastSequenceNum = seqNum 382 } 383 } 384 } 385 386 if (frame.event && frame.data) { 387 this.handleSSEFrame(frame.event, frame.data) 388 } else if (frame.data) { 389 // data: without event: — server is emitting the old envelope format 390 // or a bug. Log so incidents show as a signal instead of silent drops. 391 logForDebugging( 392 'SSETransport: Frame has data: but no event: field — dropped', 393 { level: 'warn' }, 394 ) 395 logForDiagnosticsNoPII('warn', 'cli_sse_frame_missing_event_field') 396 } 397 } 398 } 399 } catch (error) { 400 if (this.abortController?.signal.aborted) return 401 logForDebugging( 402 `SSETransport: Stream read error: ${errorMessage(error)}`, 403 { level: 'error' }, 404 ) 405 logForDiagnosticsNoPII('error', 'cli_sse_stream_read_error') 406 } finally { 407 reader.releaseLock() 408 } 409 410 // Stream ended — reconnect unless we're closing 411 if (this.state !== 'closing' && this.state !== 'closed') { 412 logForDebugging('SSETransport: Stream ended, reconnecting') 413 this.handleConnectionError() 414 } 415 } 416 417 /** 418 * Handle a single SSE frame. The event: field names the variant; data: 419 * carries the inner proto JSON directly (no envelope). 420 * 421 * Worker subscribers only receive client_event frames (see notifier.go) — 422 * any other event type indicates a server-side change that CC doesn't yet 423 * understand. Log a diagnostic so we notice in telemetry. 424 */ 425 private handleSSEFrame(eventType: string, data: string): void { 426 if (eventType !== 'client_event') { 427 logForDebugging( 428 `SSETransport: Unexpected SSE event type '${eventType}' on worker stream`, 429 { level: 'warn' }, 430 ) 431 logForDiagnosticsNoPII('warn', 'cli_sse_unexpected_event_type', { 432 event_type: eventType, 433 }) 434 return 435 } 436 437 let ev: StreamClientEvent 438 try { 439 ev = jsonParse(data) as StreamClientEvent 440 } catch (error) { 441 logForDebugging( 442 `SSETransport: Failed to parse client_event data: ${errorMessage(error)}`, 443 { level: 'error' }, 444 ) 445 return 446 } 447 448 const payload = ev.payload 449 if (payload && typeof payload === 'object' && 'type' in payload) { 450 const sessionLabel = this.sessionId ? ` session=${this.sessionId}` : '' 451 logForDebugging( 452 `SSETransport: Event seq=${ev.sequence_num} event_id=${ev.event_id} event_type=${ev.event_type} payload_type=${String(payload.type)}${sessionLabel}`, 453 ) 454 logForDiagnosticsNoPII('info', 'cli_sse_message_received') 455 // Pass the unwrapped payload as newline-delimited JSON, 456 // matching the format that StructuredIO/WebSocketTransport consumers expect 457 this.onData?.(jsonStringify(payload) + '\n') 458 } else { 459 logForDebugging( 460 `SSETransport: Ignoring client_event with no type in payload: event_id=${ev.event_id}`, 461 ) 462 } 463 464 this.onEventCallback?.(ev) 465 } 466 467 /** 468 * Handle connection errors with exponential backoff and time budget. 469 */ 470 private handleConnectionError(): void { 471 this.clearLivenessTimer() 472 473 if (this.state === 'closing' || this.state === 'closed') return 474 475 // Abort any in-flight SSE fetch 476 this.abortController?.abort() 477 this.abortController = null 478 479 const now = Date.now() 480 if (!this.reconnectStartTime) { 481 this.reconnectStartTime = now 482 } 483 484 const elapsed = now - this.reconnectStartTime 485 if (elapsed < RECONNECT_GIVE_UP_MS) { 486 // Clear any existing timer 487 if (this.reconnectTimer) { 488 clearTimeout(this.reconnectTimer) 489 this.reconnectTimer = null 490 } 491 492 // Refresh headers before reconnecting 493 if (this.refreshHeaders) { 494 const freshHeaders = this.refreshHeaders() 495 Object.assign(this.headers, freshHeaders) 496 logForDebugging('SSETransport: Refreshed headers for reconnect') 497 } 498 499 this.state = 'reconnecting' 500 this.reconnectAttempts++ 501 502 const baseDelay = Math.min( 503 RECONNECT_BASE_DELAY_MS * Math.pow(2, this.reconnectAttempts - 1), 504 RECONNECT_MAX_DELAY_MS, 505 ) 506 // Add ±25% jitter 507 const delay = Math.max( 508 0, 509 baseDelay + baseDelay * 0.25 * (2 * Math.random() - 1), 510 ) 511 512 logForDebugging( 513 `SSETransport: Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts}, ${Math.round(elapsed / 1000)}s elapsed)`, 514 ) 515 logForDiagnosticsNoPII('error', 'cli_sse_reconnect_attempt', { 516 reconnectAttempts: this.reconnectAttempts, 517 }) 518 519 this.reconnectTimer = setTimeout(() => { 520 this.reconnectTimer = null 521 void this.connect() 522 }, delay) 523 } else { 524 logForDebugging( 525 `SSETransport: Reconnection time budget exhausted after ${Math.round(elapsed / 1000)}s`, 526 { level: 'error' }, 527 ) 528 logForDiagnosticsNoPII('error', 'cli_sse_reconnect_exhausted', { 529 reconnectAttempts: this.reconnectAttempts, 530 elapsedMs: elapsed, 531 }) 532 this.state = 'closed' 533 this.onCloseCallback?.() 534 } 535 } 536 537 /** 538 * Bound timeout callback. Hoisted from an inline closure so that 539 * resetLivenessTimer (called per-frame) does not allocate a new closure 540 * on every SSE frame. 541 */ 542 private readonly onLivenessTimeout = (): void => { 543 this.livenessTimer = null 544 logForDebugging('SSETransport: Liveness timeout, reconnecting', { 545 level: 'error', 546 }) 547 logForDiagnosticsNoPII('error', 'cli_sse_liveness_timeout') 548 this.abortController?.abort() 549 this.handleConnectionError() 550 } 551 552 /** 553 * Reset the liveness timer. If no SSE frame arrives within the timeout, 554 * treat the connection as dead and reconnect. 555 */ 556 private resetLivenessTimer(): void { 557 this.clearLivenessTimer() 558 this.livenessTimer = setTimeout(this.onLivenessTimeout, LIVENESS_TIMEOUT_MS) 559 } 560 561 private clearLivenessTimer(): void { 562 if (this.livenessTimer) { 563 clearTimeout(this.livenessTimer) 564 this.livenessTimer = null 565 } 566 } 567 568 // ----------------------------------------------------------------------- 569 // Write (HTTP POST) — same pattern as HybridTransport 570 // ----------------------------------------------------------------------- 571 572 async write(message: StdoutMessage): Promise<void> { 573 const authHeaders = this.getAuthHeaders() 574 if (Object.keys(authHeaders).length === 0) { 575 logForDebugging('SSETransport: No session token available for POST') 576 logForDiagnosticsNoPII('warn', 'cli_sse_post_no_token') 577 return 578 } 579 580 const headers: Record<string, string> = { 581 ...authHeaders, 582 'Content-Type': 'application/json', 583 'anthropic-version': '2023-06-01', 584 'User-Agent': getClaudeCodeUserAgent(), 585 } 586 587 logForDebugging( 588 `SSETransport: POST body keys=${Object.keys(message as Record<string, unknown>).join(',')}`, 589 ) 590 591 for (let attempt = 1; attempt <= POST_MAX_RETRIES; attempt++) { 592 try { 593 const response = await axios.post(this.postUrl, message, { 594 headers, 595 validateStatus: alwaysValidStatus, 596 }) 597 598 if (response.status === 200 || response.status === 201) { 599 logForDebugging(`SSETransport: POST success type=${message.type}`) 600 return 601 } 602 603 logForDebugging( 604 `SSETransport: POST ${response.status} body=${jsonStringify(response.data).slice(0, 200)}`, 605 ) 606 // 4xx errors (except 429) are permanent - don't retry 607 if ( 608 response.status >= 400 && 609 response.status < 500 && 610 response.status !== 429 611 ) { 612 logForDebugging( 613 `SSETransport: POST returned ${response.status} (client error), not retrying`, 614 ) 615 logForDiagnosticsNoPII('warn', 'cli_sse_post_client_error', { 616 status: response.status, 617 }) 618 return 619 } 620 621 // 429 or 5xx - retry 622 logForDebugging( 623 `SSETransport: POST returned ${response.status}, attempt ${attempt}/${POST_MAX_RETRIES}`, 624 ) 625 logForDiagnosticsNoPII('warn', 'cli_sse_post_retryable_error', { 626 status: response.status, 627 attempt, 628 }) 629 } catch (error) { 630 const axiosError = error as AxiosError 631 logForDebugging( 632 `SSETransport: POST error: ${axiosError.message}, attempt ${attempt}/${POST_MAX_RETRIES}`, 633 ) 634 logForDiagnosticsNoPII('warn', 'cli_sse_post_network_error', { 635 attempt, 636 }) 637 } 638 639 if (attempt === POST_MAX_RETRIES) { 640 logForDebugging( 641 `SSETransport: POST failed after ${POST_MAX_RETRIES} attempts, continuing`, 642 ) 643 logForDiagnosticsNoPII('warn', 'cli_sse_post_retries_exhausted') 644 return 645 } 646 647 const delayMs = Math.min( 648 POST_BASE_DELAY_MS * Math.pow(2, attempt - 1), 649 POST_MAX_DELAY_MS, 650 ) 651 await sleep(delayMs) 652 } 653 } 654 655 // ----------------------------------------------------------------------- 656 // Transport interface 657 // ----------------------------------------------------------------------- 658 659 isConnectedStatus(): boolean { 660 return this.state === 'connected' 661 } 662 663 isClosedStatus(): boolean { 664 return this.state === 'closed' 665 } 666 667 setOnData(callback: (data: string) => void): void { 668 this.onData = callback 669 } 670 671 setOnClose(callback: (closeCode?: number) => void): void { 672 this.onCloseCallback = callback 673 } 674 675 setOnEvent(callback: (event: StreamClientEvent) => void): void { 676 this.onEventCallback = callback 677 } 678 679 close(): void { 680 if (this.reconnectTimer) { 681 clearTimeout(this.reconnectTimer) 682 this.reconnectTimer = null 683 } 684 this.clearLivenessTimer() 685 686 this.state = 'closing' 687 this.abortController?.abort() 688 this.abortController = null 689 } 690} 691 692// --------------------------------------------------------------------------- 693// URL Conversion 694// --------------------------------------------------------------------------- 695 696/** 697 * Convert an SSE URL to the HTTP POST endpoint URL. 698 * The SSE stream URL and POST URL share the same base; the POST endpoint 699 * is at `/events` (without `/stream`). 700 * 701 * From: https://api.example.com/v2/session_ingress/session/<session_id>/events/stream 702 * To: https://api.example.com/v2/session_ingress/session/<session_id>/events 703 */ 704function convertSSEUrlToPostUrl(sseUrl: URL): string { 705 let pathname = sseUrl.pathname 706 // Remove /stream suffix to get the POST events endpoint 707 if (pathname.endsWith('/stream')) { 708 pathname = pathname.slice(0, -'/stream'.length) 709 } 710 return `${sseUrl.protocol}//${sseUrl.host}${pathname}` 711}