source dump of claude code
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 282 lines 11 kB view raw
1import axios, { type AxiosError } from 'axios' 2import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js' 3import { logForDebugging } from '../../utils/debug.js' 4import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js' 5import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js' 6import { SerialBatchEventUploader } from './SerialBatchEventUploader.js' 7import { 8 WebSocketTransport, 9 type WebSocketTransportOptions, 10} from './WebSocketTransport.js' 11 12const BATCH_FLUSH_INTERVAL_MS = 100 13// Per-attempt POST timeout. Bounds how long a single stuck POST can block 14// the serialized queue. Without this, a hung connection stalls all writes. 15const POST_TIMEOUT_MS = 15_000 16// Grace period for queued writes on close(). Covers a healthy POST (~100ms) 17// plus headroom; best-effort, not a delivery guarantee under degraded network. 18// Void-ed (nothing awaits it) so this is a last resort — replBridge teardown 19// now closes AFTER archive so archive latency is the primary drain window. 20// NOTE: gracefulShutdown's cleanup budget is 2s (not the 5s outer failsafe); 21// 3s here exceeds it, but the process lives ~2s longer for hooks+analytics. 22const CLOSE_GRACE_MS = 3000 23 24/** 25 * Hybrid transport: WebSocket for reads, HTTP POST for writes. 26 * 27 * Write flow: 28 * 29 * write(stream_event) ─┐ 30 * │ (100ms timer) 31 * │ 32 * ▼ 33 * write(other) ────► uploader.enqueue() (SerialBatchEventUploader) 34 * ▲ │ 35 * writeBatch() ────────┘ │ serial, batched, retries indefinitely, 36 * │ backpressure at maxQueueSize 37 * ▼ 38 * postOnce() (single HTTP POST, throws on retryable) 39 * 40 * stream_event messages accumulate in streamEventBuffer for up to 100ms 41 * before enqueue (reduces POST count for high-volume content deltas). A 42 * non-stream write flushes any buffered stream_events first to preserve order. 43 * 44 * Serialization + retry + backpressure are delegated to SerialBatchEventUploader 45 * (same primitive CCR uses). At most one POST in-flight; events arriving during 46 * a POST batch into the next one. On failure, the uploader re-queues and retries 47 * with exponential backoff + jitter. If the queue fills past maxQueueSize, 48 * enqueue() blocks — giving awaiting callers backpressure. 49 * 50 * Why serialize? Bridge mode fires writes via `void transport.write()` 51 * (fire-and-forget). Without this, concurrent POSTs → concurrent Firestore 52 * writes to the same document → collisions → retry storms → pages oncall. 53 */ 54export class HybridTransport extends WebSocketTransport { 55 private postUrl: string 56 private uploader: SerialBatchEventUploader<StdoutMessage> 57 58 // stream_event delay buffer — accumulates content deltas for up to 59 // BATCH_FLUSH_INTERVAL_MS before enqueueing (reduces POST count) 60 private streamEventBuffer: StdoutMessage[] = [] 61 private streamEventTimer: ReturnType<typeof setTimeout> | null = null 62 63 constructor( 64 url: URL, 65 headers: Record<string, string> = {}, 66 sessionId?: string, 67 refreshHeaders?: () => Record<string, string>, 68 options?: WebSocketTransportOptions & { 69 maxConsecutiveFailures?: number 70 onBatchDropped?: (batchSize: number, failures: number) => void 71 }, 72 ) { 73 super(url, headers, sessionId, refreshHeaders, options) 74 const { maxConsecutiveFailures, onBatchDropped } = options ?? {} 75 this.postUrl = convertWsUrlToPostUrl(url) 76 this.uploader = new SerialBatchEventUploader<StdoutMessage>({ 77 // Large cap — session-ingress accepts arbitrary batch sizes. Events 78 // naturally batch during in-flight POSTs; this just bounds the payload. 79 maxBatchSize: 500, 80 // Bridge callers use `void transport.write()` — backpressure doesn't 81 // apply (they don't await). A batch >maxQueueSize deadlocks (see 82 // SerialBatchEventUploader backpressure check). So set it high enough 83 // to be a memory bound only. Wire real backpressure in a follow-up 84 // once callers await. 85 maxQueueSize: 100_000, 86 baseDelayMs: 500, 87 maxDelayMs: 8000, 88 jitterMs: 1000, 89 // Optional cap so a persistently-failing server can't pin the drain 90 // loop for the lifetime of the process. Undefined = indefinite retry. 91 // replBridge sets this; the 1P transportUtils path does not. 92 maxConsecutiveFailures, 93 onBatchDropped: (batchSize, failures) => { 94 logForDiagnosticsNoPII( 95 'error', 96 'cli_hybrid_batch_dropped_max_failures', 97 { 98 batchSize, 99 failures, 100 }, 101 ) 102 onBatchDropped?.(batchSize, failures) 103 }, 104 send: batch => this.postOnce(batch), 105 }) 106 logForDebugging(`HybridTransport: POST URL = ${this.postUrl}`) 107 logForDiagnosticsNoPII('info', 'cli_hybrid_transport_initialized') 108 } 109 110 /** 111 * Enqueue a message and wait for the queue to drain. Returning flush() 112 * preserves the contract that `await write()` resolves after the event is 113 * POSTed (relied on by tests and replBridge's initial flush). Fire-and-forget 114 * callers (`void transport.write()`) are unaffected — they don't await, 115 * so the later resolution doesn't add latency. 116 */ 117 override async write(message: StdoutMessage): Promise<void> { 118 if (message.type === 'stream_event') { 119 // Delay: accumulate stream_events briefly before enqueueing. 120 // Promise resolves immediately — callers don't await stream_events. 121 this.streamEventBuffer.push(message) 122 if (!this.streamEventTimer) { 123 this.streamEventTimer = setTimeout( 124 () => this.flushStreamEvents(), 125 BATCH_FLUSH_INTERVAL_MS, 126 ) 127 } 128 return 129 } 130 // Immediate: flush any buffered stream_events (ordering), then this event. 131 await this.uploader.enqueue([...this.takeStreamEvents(), message]) 132 return this.uploader.flush() 133 } 134 135 async writeBatch(messages: StdoutMessage[]): Promise<void> { 136 await this.uploader.enqueue([...this.takeStreamEvents(), ...messages]) 137 return this.uploader.flush() 138 } 139 140 /** Snapshot before/after writeBatch() to detect silent drops. */ 141 get droppedBatchCount(): number { 142 return this.uploader.droppedBatchCount 143 } 144 145 /** 146 * Block until all pending events are POSTed. Used by bridge's initial 147 * history flush so onStateChange('connected') fires after persistence. 148 */ 149 flush(): Promise<void> { 150 void this.uploader.enqueue(this.takeStreamEvents()) 151 return this.uploader.flush() 152 } 153 154 /** Take ownership of buffered stream_events and clear the delay timer. */ 155 private takeStreamEvents(): StdoutMessage[] { 156 if (this.streamEventTimer) { 157 clearTimeout(this.streamEventTimer) 158 this.streamEventTimer = null 159 } 160 const buffered = this.streamEventBuffer 161 this.streamEventBuffer = [] 162 return buffered 163 } 164 165 /** Delay timer fired — enqueue accumulated stream_events. */ 166 private flushStreamEvents(): void { 167 this.streamEventTimer = null 168 void this.uploader.enqueue(this.takeStreamEvents()) 169 } 170 171 override close(): void { 172 if (this.streamEventTimer) { 173 clearTimeout(this.streamEventTimer) 174 this.streamEventTimer = null 175 } 176 this.streamEventBuffer = [] 177 // Grace period for queued writes — fallback. replBridge teardown now 178 // awaits archive between write and close (see CLOSE_GRACE_MS), so 179 // archive latency is the primary drain window and this is a last 180 // resort. Keep close() sync (returns immediately) but defer 181 // uploader.close() so any remaining queue gets a chance to finish. 182 const uploader = this.uploader 183 let graceTimer: ReturnType<typeof setTimeout> | undefined 184 void Promise.race([ 185 uploader.flush(), 186 new Promise<void>(r => { 187 // eslint-disable-next-line no-restricted-syntax -- need timer ref for clearTimeout 188 graceTimer = setTimeout(r, CLOSE_GRACE_MS) 189 }), 190 ]).finally(() => { 191 clearTimeout(graceTimer) 192 uploader.close() 193 }) 194 super.close() 195 } 196 197 /** 198 * Single-attempt POST. Throws on retryable failures (429, 5xx, network) 199 * so SerialBatchEventUploader re-queues and retries. Returns on success 200 * and on permanent failures (4xx non-429, no token) so the uploader moves on. 201 */ 202 private async postOnce(events: StdoutMessage[]): Promise<void> { 203 const sessionToken = getSessionIngressAuthToken() 204 if (!sessionToken) { 205 logForDebugging('HybridTransport: No session token available for POST') 206 logForDiagnosticsNoPII('warn', 'cli_hybrid_post_no_token') 207 return 208 } 209 210 const headers: Record<string, string> = { 211 Authorization: `Bearer ${sessionToken}`, 212 'Content-Type': 'application/json', 213 } 214 215 let response 216 try { 217 response = await axios.post( 218 this.postUrl, 219 { events }, 220 { 221 headers, 222 validateStatus: () => true, 223 timeout: POST_TIMEOUT_MS, 224 }, 225 ) 226 } catch (error) { 227 const axiosError = error as AxiosError 228 logForDebugging(`HybridTransport: POST error: ${axiosError.message}`) 229 logForDiagnosticsNoPII('warn', 'cli_hybrid_post_network_error') 230 throw error 231 } 232 233 if (response.status >= 200 && response.status < 300) { 234 logForDebugging(`HybridTransport: POST success count=${events.length}`) 235 return 236 } 237 238 // 4xx (except 429) are permanent — drop, don't retry. 239 if ( 240 response.status >= 400 && 241 response.status < 500 && 242 response.status !== 429 243 ) { 244 logForDebugging( 245 `HybridTransport: POST returned ${response.status} (permanent), dropping`, 246 ) 247 logForDiagnosticsNoPII('warn', 'cli_hybrid_post_client_error', { 248 status: response.status, 249 }) 250 return 251 } 252 253 // 429 / 5xx — retryable. Throw so uploader re-queues and backs off. 254 logForDebugging( 255 `HybridTransport: POST returned ${response.status} (retryable)`, 256 ) 257 logForDiagnosticsNoPII('warn', 'cli_hybrid_post_retryable_error', { 258 status: response.status, 259 }) 260 throw new Error(`POST failed with ${response.status}`) 261 } 262} 263 264/** 265 * Convert a WebSocket URL to the HTTP POST endpoint URL. 266 * From: wss://api.example.com/v2/session_ingress/ws/<session_id> 267 * To: https://api.example.com/v2/session_ingress/session/<session_id>/events 268 */ 269function convertWsUrlToPostUrl(wsUrl: URL): string { 270 const protocol = wsUrl.protocol === 'wss:' ? 'https:' : 'http:' 271 272 // Replace /ws/ with /session/ and append /events 273 let pathname = wsUrl.pathname 274 pathname = pathname.replace('/ws/', '/session/') 275 if (!pathname.endsWith('/events')) { 276 pathname = pathname.endsWith('/') 277 ? pathname + 'events' 278 : pathname + '/events' 279 } 280 281 return `${protocol}//${wsUrl.host}${pathname}${wsUrl.search}` 282}