(Alleged) Leaked source of Claude Code
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 514 lines 17 kB view raw
1import axios, { type AxiosError } from 'axios' 2import type { UUID } from 'crypto' 3import { getOauthConfig } from '../../constants/oauth.js' 4import type { Entry, TranscriptMessage } from '../../types/logs.js' 5import { logForDebugging } from '../../utils/debug.js' 6import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js' 7import { isEnvTruthy } from '../../utils/envUtils.js' 8import { logError } from '../../utils/log.js' 9import { sequential } from '../../utils/sequential.js' 10import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js' 11import { sleep } from '../../utils/sleep.js' 12import { jsonStringify } from '../../utils/slowOperations.js' 13import { getOAuthHeaders } from '../../utils/teleport/api.js' 14 15interface SessionIngressError { 16 error?: { 17 message?: string 18 type?: string 19 } 20} 21 22// Module-level state 23const lastUuidMap: Map<string, UUID> = new Map() 24 25const MAX_RETRIES = 10 26const BASE_DELAY_MS = 500 27 28// Per-session sequential wrappers to prevent concurrent log writes 29const sequentialAppendBySession: Map< 30 string, 31 ( 32 entry: TranscriptMessage, 33 url: string, 34 headers: Record<string, string>, 35 ) => Promise<boolean> 36> = new Map() 37 38/** 39 * Gets or creates a sequential wrapper for a session 40 * This ensures that log appends for a session are processed one at a time 41 */ 42function getOrCreateSequentialAppend(sessionId: string) { 43 let sequentialAppend = sequentialAppendBySession.get(sessionId) 44 if (!sequentialAppend) { 45 sequentialAppend = sequential( 46 async ( 47 entry: TranscriptMessage, 48 url: string, 49 headers: Record<string, string>, 50 ) => await appendSessionLogImpl(sessionId, entry, url, headers), 51 ) 52 sequentialAppendBySession.set(sessionId, sequentialAppend) 53 } 54 return sequentialAppend 55} 56 57/** 58 * Internal implementation of appendSessionLog with retry logic 59 * Retries on transient errors (network, 5xx, 429). On 409, adopts the server's 60 * last UUID and retries (handles stale state from killed process's in-flight 61 * requests). Fails immediately on 401. 62 */ 63async function appendSessionLogImpl( 64 sessionId: string, 65 entry: TranscriptMessage, 66 url: string, 67 headers: Record<string, string>, 68): Promise<boolean> { 69 for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { 70 try { 71 const lastUuid = lastUuidMap.get(sessionId) 72 const requestHeaders = { ...headers } 73 if (lastUuid) { 74 requestHeaders['Last-Uuid'] = lastUuid 75 } 76 77 const response = await axios.put(url, entry, { 78 headers: requestHeaders, 79 validateStatus: status => status < 500, 80 }) 81 82 if (response.status === 200 || response.status === 201) { 83 lastUuidMap.set(sessionId, entry.uuid) 84 logForDebugging( 85 `Successfully persisted session log entry for session ${sessionId}`, 86 ) 87 return true 88 } 89 90 if (response.status === 409) { 91 // Check if our entry was actually stored (server returned 409 but entry exists) 92 // This handles the scenario where entry was stored but client received an error 93 // response, causing lastUuidMap to be stale 94 const serverLastUuid = response.headers['x-last-uuid'] 95 if (serverLastUuid === entry.uuid) { 96 // Our entry IS the last entry on server - it was stored successfully previously 97 lastUuidMap.set(sessionId, entry.uuid) 98 logForDebugging( 99 `Session entry ${entry.uuid} already present on server, recovering from stale state`, 100 ) 101 logForDiagnosticsNoPII('info', 'session_persist_recovered_from_409') 102 return true 103 } 104 105 // Another writer (e.g. in-flight request from a killed process) 106 // advanced the server's chain. Try to adopt the server's last UUID 107 // from the response header, or re-fetch the session to discover it. 108 if (serverLastUuid) { 109 lastUuidMap.set(sessionId, serverLastUuid as UUID) 110 logForDebugging( 111 `Session 409: adopting server lastUuid=${serverLastUuid} from header, retrying entry ${entry.uuid}`, 112 ) 113 } else { 114 // Server didn't return x-last-uuid (e.g. v1 endpoint). Re-fetch 115 // the session to discover the current head of the append chain. 116 const logs = await fetchSessionLogsFromUrl(sessionId, url, headers) 117 const adoptedUuid = findLastUuid(logs) 118 if (adoptedUuid) { 119 lastUuidMap.set(sessionId, adoptedUuid) 120 logForDebugging( 121 `Session 409: re-fetched ${logs!.length} entries, adopting lastUuid=${adoptedUuid}, retrying entry ${entry.uuid}`, 122 ) 123 } else { 124 // Can't determine server state — give up 125 const errorData = response.data as SessionIngressError 126 const errorMessage = 127 errorData.error?.message || 'Concurrent modification detected' 128 logError( 129 new Error( 130 `Session persistence conflict: UUID mismatch for session ${sessionId}, entry ${entry.uuid}. ${errorMessage}`, 131 ), 132 ) 133 logForDiagnosticsNoPII( 134 'error', 135 'session_persist_fail_concurrent_modification', 136 ) 137 return false 138 } 139 } 140 logForDiagnosticsNoPII('info', 'session_persist_409_adopt_server_uuid') 141 continue // retry with updated lastUuid 142 } 143 144 if (response.status === 401) { 145 logForDebugging('Session token expired or invalid') 146 logForDiagnosticsNoPII('error', 'session_persist_fail_bad_token') 147 return false // Non-retryable 148 } 149 150 // Other 4xx (429, etc.) - retryable 151 logForDebugging( 152 `Failed to persist session log: ${response.status} ${response.statusText}`, 153 ) 154 logForDiagnosticsNoPII('error', 'session_persist_fail_status', { 155 status: response.status, 156 attempt, 157 }) 158 } catch (error) { 159 // Network errors, 5xx - retryable 160 const axiosError = error as AxiosError<SessionIngressError> 161 logError(new Error(`Error persisting session log: ${axiosError.message}`)) 162 logForDiagnosticsNoPII('error', 'session_persist_fail_status', { 163 status: axiosError.status, 164 attempt, 165 }) 166 } 167 168 if (attempt === MAX_RETRIES) { 169 logForDebugging(`Remote persistence failed after ${MAX_RETRIES} attempts`) 170 logForDiagnosticsNoPII( 171 'error', 172 'session_persist_error_retries_exhausted', 173 { attempt }, 174 ) 175 return false 176 } 177 178 const delayMs = Math.min(BASE_DELAY_MS * Math.pow(2, attempt - 1), 8000) 179 logForDebugging( 180 `Remote persistence attempt ${attempt}/${MAX_RETRIES} failed, retrying in ${delayMs}ms…`, 181 ) 182 await sleep(delayMs) 183 } 184 185 return false 186} 187 188/** 189 * Append a log entry to the session using JWT token 190 * Uses optimistic concurrency control with Last-Uuid header 191 * Ensures sequential execution per session to prevent race conditions 192 */ 193export async function appendSessionLog( 194 sessionId: string, 195 entry: TranscriptMessage, 196 url: string, 197): Promise<boolean> { 198 const sessionToken = getSessionIngressAuthToken() 199 if (!sessionToken) { 200 logForDebugging('No session token available for session persistence') 201 logForDiagnosticsNoPII('error', 'session_persist_fail_jwt_no_token') 202 return false 203 } 204 205 const headers: Record<string, string> = { 206 Authorization: `Bearer ${sessionToken}`, 207 'Content-Type': 'application/json', 208 } 209 210 const sequentialAppend = getOrCreateSequentialAppend(sessionId) 211 return sequentialAppend(entry, url, headers) 212} 213 214/** 215 * Get all session logs for hydration 216 */ 217export async function getSessionLogs( 218 sessionId: string, 219 url: string, 220): Promise<Entry[] | null> { 221 const sessionToken = getSessionIngressAuthToken() 222 if (!sessionToken) { 223 logForDebugging('No session token available for fetching session logs') 224 logForDiagnosticsNoPII('error', 'session_get_fail_no_token') 225 return null 226 } 227 228 const headers = { Authorization: `Bearer ${sessionToken}` } 229 const logs = await fetchSessionLogsFromUrl(sessionId, url, headers) 230 231 if (logs && logs.length > 0) { 232 // Update our lastUuid to the last entry's UUID 233 const lastEntry = logs.at(-1) 234 if (lastEntry && 'uuid' in lastEntry && lastEntry.uuid) { 235 lastUuidMap.set(sessionId, lastEntry.uuid) 236 } 237 } 238 239 return logs 240} 241 242/** 243 * Get all session logs for hydration via OAuth 244 * Used for teleporting sessions from the Sessions API 245 */ 246export async function getSessionLogsViaOAuth( 247 sessionId: string, 248 accessToken: string, 249 orgUUID: string, 250): Promise<Entry[] | null> { 251 const url = `${getOauthConfig().BASE_API_URL}/v1/session_ingress/session/${sessionId}` 252 logForDebugging(`[session-ingress] Fetching session logs from: ${url}`) 253 const headers = { 254 ...getOAuthHeaders(accessToken), 255 'x-organization-uuid': orgUUID, 256 } 257 const result = await fetchSessionLogsFromUrl(sessionId, url, headers) 258 return result 259} 260 261/** 262 * Response shape from GET /v1/code/sessions/{id}/teleport-events. 263 * WorkerEvent.payload IS the Entry (TranscriptMessage struct) — the CLI 264 * writes it via AddWorkerEvent, the server stores it opaque, we read it 265 * back here. 266 */ 267type TeleportEventsResponse = { 268 data: Array<{ 269 event_id: string 270 event_type: string 271 is_compaction: boolean 272 payload: Entry | null 273 created_at: string 274 }> 275 // Unset when there are no more pages — this IS the end-of-stream 276 // signal (no separate has_more field). 277 next_cursor?: string 278} 279 280/** 281 * Get worker events (transcript) via the CCR v2 Sessions API. Replaces 282 * getSessionLogsViaOAuth once session-ingress is retired. 283 * 284 * The server dispatches per-session: Spanner for v2-native sessions, 285 * threadstore for pre-backfill session_* IDs. The cursor is opaque to us — 286 * echo it back until next_cursor is unset. 287 * 288 * Paginated (500/page default, server max 1000). session-ingress's one-shot 289 * 50k is gone; we loop. 290 */ 291export async function getTeleportEvents( 292 sessionId: string, 293 accessToken: string, 294 orgUUID: string, 295): Promise<Entry[] | null> { 296 const baseUrl = `${getOauthConfig().BASE_API_URL}/v1/code/sessions/${sessionId}/teleport-events` 297 const headers = { 298 ...getOAuthHeaders(accessToken), 299 'x-organization-uuid': orgUUID, 300 } 301 302 logForDebugging(`[teleport] Fetching events from: ${baseUrl}`) 303 304 const all: Entry[] = [] 305 let cursor: string | undefined 306 let pages = 0 307 308 // Infinite-loop guard: 1000/page × 100 pages = 100k events. Larger than 309 // session-ingress's 50k one-shot. If we hit this, something's wrong 310 // (server not advancing cursor) — bail rather than hang. 311 const maxPages = 100 312 313 while (pages < maxPages) { 314 const params: Record<string, string | number> = { limit: 1000 } 315 if (cursor !== undefined) { 316 params.cursor = cursor 317 } 318 319 let response 320 try { 321 response = await axios.get<TeleportEventsResponse>(baseUrl, { 322 headers, 323 params, 324 timeout: 20000, 325 validateStatus: status => status < 500, 326 }) 327 } catch (e) { 328 const err = e as AxiosError 329 logError(new Error(`Teleport events fetch failed: ${err.message}`)) 330 logForDiagnosticsNoPII('error', 'teleport_events_fetch_fail') 331 return null 332 } 333 334 if (response.status === 404) { 335 // 404 on page 0 is ambiguous during the migration window: 336 // (a) Session genuinely not found (not in Spanner AND not in 337 // threadstore) — nothing to fetch. 338 // (b) Route-level 404: endpoint not deployed yet, or session is 339 // a threadstore session not yet backfilled into Spanner. 340 // We can't tell them apart from the response alone. Returning null 341 // lets the caller fall back to session-ingress, which will correctly 342 // return empty for case (a) and data for case (b). Once the backfill 343 // is complete and session-ingress is gone, the fallback also returns 344 // null → same "Failed to fetch session logs" error as today. 345 // 346 // 404 mid-pagination (pages > 0) means session was deleted between 347 // pages — return what we have. 348 logForDebugging( 349 `[teleport] Session ${sessionId} not found (page ${pages})`, 350 ) 351 logForDiagnosticsNoPII('warn', 'teleport_events_not_found') 352 return pages === 0 ? null : all 353 } 354 355 if (response.status === 401) { 356 logForDiagnosticsNoPII('error', 'teleport_events_bad_token') 357 throw new Error( 358 'Your session has expired. Please run /login to sign in again.', 359 ) 360 } 361 362 if (response.status !== 200) { 363 logError( 364 new Error( 365 `Teleport events returned ${response.status}: ${jsonStringify(response.data)}`, 366 ), 367 ) 368 logForDiagnosticsNoPII('error', 'teleport_events_bad_status') 369 return null 370 } 371 372 const { data, next_cursor } = response.data 373 if (!Array.isArray(data)) { 374 logError( 375 new Error( 376 `Teleport events invalid response shape: ${jsonStringify(response.data)}`, 377 ), 378 ) 379 logForDiagnosticsNoPII('error', 'teleport_events_invalid_shape') 380 return null 381 } 382 383 // payload IS the Entry. null payload happens for threadstore non-generic 384 // events (server skips them) or encryption failures — skip here too. 385 for (const ev of data) { 386 if (ev.payload !== null) { 387 all.push(ev.payload) 388 } 389 } 390 391 pages++ 392 // == null covers both `null` and `undefined` — the proto omits the 393 // field at end-of-stream, but some serializers emit `null`. Strict 394 // `=== undefined` would loop forever on `null` (cursor=null in query 395 // params stringifies to "null", which the server rejects or echoes). 396 if (next_cursor == null) { 397 break 398 } 399 cursor = next_cursor 400 } 401 402 if (pages >= maxPages) { 403 // Don't fail — return what we have. Better to teleport with a 404 // truncated transcript than not at all. 405 logError( 406 new Error(`Teleport events hit page cap (${maxPages}) for ${sessionId}`), 407 ) 408 logForDiagnosticsNoPII('warn', 'teleport_events_page_cap') 409 } 410 411 logForDebugging( 412 `[teleport] Fetched ${all.length} events over ${pages} page(s) for ${sessionId}`, 413 ) 414 return all 415} 416 417/** 418 * Shared implementation for fetching session logs from a URL 419 */ 420async function fetchSessionLogsFromUrl( 421 sessionId: string, 422 url: string, 423 headers: Record<string, string>, 424): Promise<Entry[] | null> { 425 try { 426 const response = await axios.get(url, { 427 headers, 428 timeout: 20000, 429 validateStatus: status => status < 500, 430 params: isEnvTruthy(process.env.CLAUDE_AFTER_LAST_COMPACT) 431 ? { after_last_compact: true } 432 : undefined, 433 }) 434 435 if (response.status === 200) { 436 const data = response.data 437 438 // Validate the response structure 439 if (!data || typeof data !== 'object' || !Array.isArray(data.loglines)) { 440 logError( 441 new Error( 442 `Invalid session logs response format: ${jsonStringify(data)}`, 443 ), 444 ) 445 logForDiagnosticsNoPII('error', 'session_get_fail_invalid_response') 446 return null 447 } 448 449 const logs = data.loglines as Entry[] 450 logForDebugging( 451 `Fetched ${logs.length} session logs for session ${sessionId}`, 452 ) 453 return logs 454 } 455 456 if (response.status === 404) { 457 logForDebugging(`No existing logs for session ${sessionId}`) 458 logForDiagnosticsNoPII('warn', 'session_get_no_logs_for_session') 459 return [] 460 } 461 462 if (response.status === 401) { 463 logForDebugging('Auth token expired or invalid') 464 logForDiagnosticsNoPII('error', 'session_get_fail_bad_token') 465 throw new Error( 466 'Your session has expired. Please run /login to sign in again.', 467 ) 468 } 469 470 logForDebugging( 471 `Failed to fetch session logs: ${response.status} ${response.statusText}`, 472 ) 473 logForDiagnosticsNoPII('error', 'session_get_fail_status', { 474 status: response.status, 475 }) 476 return null 477 } catch (error) { 478 const axiosError = error as AxiosError<SessionIngressError> 479 logError(new Error(`Error fetching session logs: ${axiosError.message}`)) 480 logForDiagnosticsNoPII('error', 'session_get_fail_status', { 481 status: axiosError.status, 482 }) 483 return null 484 } 485} 486 487/** 488 * Walk backward through entries to find the last one with a uuid. 489 * Some entry types (SummaryMessage, TagMessage) don't have one. 490 */ 491function findLastUuid(logs: Entry[] | null): UUID | undefined { 492 if (!logs) { 493 return undefined 494 } 495 const entry = logs.findLast(e => 'uuid' in e && e.uuid) 496 return entry && 'uuid' in entry ? (entry.uuid as UUID) : undefined 497} 498 499/** 500 * Clear cached state for a session 501 */ 502export function clearSession(sessionId: string): void { 503 lastUuidMap.delete(sessionId) 504 sequentialAppendBySession.delete(sessionId) 505} 506 507/** 508 * Clear all cached session state (all sessions). 509 * Use this on /clear to free sub-agent session entries. 510 */ 511export function clearAllSessions(): void { 512 lastUuidMap.clear() 513 sequentialAppendBySession.clear() 514}