the claude code sourcemaps leaked march 31
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1256 lines 44 kB view raw
1/** 2 * Team Memory Sync Service 3 * 4 * Syncs team memory files between the local filesystem and the server API. 5 * Team memory is scoped per-repo (identified by git remote hash) and shared 6 * across all authenticated org members. 7 * 8 * API contract (anthropic/anthropic#250711 + #283027): 9 * GET /api/claude_code/team_memory?repo={owner/repo} → TeamMemoryData (includes entryChecksums) 10 * GET /api/claude_code/team_memory?repo={owner/repo}&view=hashes → metadata + entryChecksums only (no entry bodies) 11 * PUT /api/claude_code/team_memory?repo={owner/repo} → upload entries (upsert semantics) 12 * 404 = no data exists yet 13 * 14 * Sync semantics: 15 * - Pull overwrites local files with server content (server wins per-key). 16 * - Push uploads only keys whose content hash differs from serverChecksums 17 * (delta upload). Server uses upsert: keys not in the PUT are preserved. 18 * - File deletions do NOT propagate: deleting a local file won't remove it 19 * from the server, and the next pull will restore it locally. 20 * 21 * State management: 22 * All mutable state (ETag tracking, watcher suppression) lives in a 23 * SyncState object created by the caller and threaded through every call. 24 * This avoids module-level mutable state and gives tests natural isolation. 25 */ 26 27import axios from 'axios' 28import { createHash } from 'crypto' 29import { mkdir, readdir, readFile, stat, writeFile } from 'fs/promises' 30import { join, relative, sep } from 'path' 31import { 32 CLAUDE_AI_INFERENCE_SCOPE, 33 CLAUDE_AI_PROFILE_SCOPE, 34 getOauthConfig, 35 OAUTH_BETA_HEADER, 36} from '../../constants/oauth.js' 37import { 38 getTeamMemPath, 39 PathTraversalError, 40 validateTeamMemKey, 41} from '../../memdir/teamMemPaths.js' 42import { count } from '../../utils/array.js' 43import { 44 checkAndRefreshOAuthTokenIfNeeded, 45 getClaudeAIOAuthTokens, 46} from '../../utils/auth.js' 47import { logForDebugging } from '../../utils/debug.js' 48import { classifyAxiosError } from '../../utils/errors.js' 49import { getGithubRepo } from '../../utils/git.js' 50import { 51 getAPIProvider, 52 isFirstPartyAnthropicBaseUrl, 53} from '../../utils/model/providers.js' 54import { sleep } from '../../utils/sleep.js' 55import { jsonStringify } from '../../utils/slowOperations.js' 56import { getClaudeCodeUserAgent } from '../../utils/userAgent.js' 57import { logEvent } from '../analytics/index.js' 58import type { AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS } from '../analytics/metadata.js' 59import { getRetryDelay } from '../api/withRetry.js' 60import { scanForSecrets } from './secretScanner.js' 61import { 62 type SkippedSecretFile, 63 TeamMemoryDataSchema, 64 type TeamMemoryHashesResult, 65 type TeamMemorySyncFetchResult, 66 type TeamMemorySyncPushResult, 67 type TeamMemorySyncUploadResult, 68 TeamMemoryTooManyEntriesSchema, 69} from './types.js' 70 71const TEAM_MEMORY_SYNC_TIMEOUT_MS = 30_000 72// Per-entry size cap — server default from anthropic/anthropic#293258. 73// Pre-filtering oversized entries saves bandwidth: the structured 413 for 74// this case doesn't give us anything to learn (one file is just too big). 75const MAX_FILE_SIZE_BYTES = 250_000 76// No client-side DEFAULT_MAX_ENTRIES: the server's entry-count cap is 77// GB-tunable per-org (claude_code_team_memory_limits), so any compile-time 78// constant here will drift. We only truncate after learning the effective 79// limit from a structured 413's extra_details.max_entries. 80// Gateway body-size cap. The API gateway rejects PUT bodies over ~256-512KB 81// with an unstructured (HTML) 413 before the request reaches the app server — 82// distinguishable from the app's structured entry-count 413 only by latency 83// (~750ms gateway vs ~2.3s app on comparable payloads). #21969 removed the 84// client entry-count cap; cold pushes from heavy users then sent 300KB-1.4MB 85// bodies and hit this. 200KB leaves headroom under the observed threshold 86// and keeps a single-entry-at-MAX_FILE_SIZE_BYTES solo batch (~250KB) just 87// under the real gateway limit. Batches larger than this are split into 88// sequential PUTs — server upsert-merge semantics make that safe. 89const MAX_PUT_BODY_BYTES = 200_000 90const MAX_RETRIES = 3 91const MAX_CONFLICT_RETRIES = 2 92 93// ─── Sync state ───────────────────────────────────────────── 94 95/** 96 * Mutable state for the team memory sync service. 97 * Created once per session by the watcher and passed to all sync functions. 98 * Tests create a fresh instance per test for isolation. 99 */ 100export type SyncState = { 101 /** Last known server checksum (ETag) for conditional requests. */ 102 lastKnownChecksum: string | null 103 /** 104 * Per-key content hash (`sha256:<hex>`) of what we believe the server 105 * currently holds. Populated from server-provided entryChecksums on pull 106 * and from local hashes on successful push. Used to compute the delta on 107 * push — only keys whose local hash differs are uploaded. 108 */ 109 serverChecksums: Map<string, string> 110 /** 111 * Server-enforced max_entries cap, learned from a structured 413 response 112 * (anthropic/anthropic#293258 adds error_code + extra_details.max_entries). 113 * Stays null until a 413 is observed — the server's cap is GB-tunable 114 * per-org so there is no correct client-side default. While null, 115 * readLocalTeamMemory sends everything and lets the server be 116 * authoritative (it rejects atomically). 117 */ 118 serverMaxEntries: number | null 119} 120 121export function createSyncState(): SyncState { 122 return { 123 lastKnownChecksum: null, 124 serverChecksums: new Map(), 125 serverMaxEntries: null, 126 } 127} 128 129/** 130 * Compute `sha256:<hex>` over the UTF-8 bytes of the given content. 131 * Format matches the server's entryChecksums values (anthropic/anthropic#283027) 132 * so local-vs-server comparison works by direct string equality. 133 */ 134export function hashContent(content: string): string { 135 return 'sha256:' + createHash('sha256').update(content, 'utf8').digest('hex') 136} 137 138/** 139 * Type guard narrowing an unknown error to a Node.js errno-style exception. 140 * Uses `in` narrowing so no `as` cast is needed at call sites. 141 */ 142function isErrnoException(e: unknown): e is NodeJS.ErrnoException { 143 return e instanceof Error && 'code' in e && typeof e.code === 'string' 144} 145 146// ─── Auth & endpoint ───────────────────────────────────────── 147 148/** 149 * Check if user is authenticated with first-party OAuth (required for team memory sync). 150 */ 151function isUsingOAuth(): boolean { 152 if (getAPIProvider() !== 'firstParty' || !isFirstPartyAnthropicBaseUrl()) { 153 return false 154 } 155 const tokens = getClaudeAIOAuthTokens() 156 return Boolean( 157 tokens?.accessToken && 158 tokens.scopes?.includes(CLAUDE_AI_INFERENCE_SCOPE) && 159 tokens.scopes.includes(CLAUDE_AI_PROFILE_SCOPE), 160 ) 161} 162 163function getTeamMemorySyncEndpoint(repoSlug: string): string { 164 const baseUrl = 165 process.env.TEAM_MEMORY_SYNC_URL || getOauthConfig().BASE_API_URL 166 return `${baseUrl}/api/claude_code/team_memory?repo=${encodeURIComponent(repoSlug)}` 167} 168 169function getAuthHeaders(): { 170 headers?: Record<string, string> 171 error?: string 172} { 173 const oauthTokens = getClaudeAIOAuthTokens() 174 if (oauthTokens?.accessToken) { 175 return { 176 headers: { 177 Authorization: `Bearer ${oauthTokens.accessToken}`, 178 'anthropic-beta': OAUTH_BETA_HEADER, 179 'User-Agent': getClaudeCodeUserAgent(), 180 }, 181 } 182 } 183 return { error: 'No OAuth token available for team memory sync' } 184} 185 186// ─── Fetch (pull) ──────────────────────────────────────────── 187 188async function fetchTeamMemoryOnce( 189 state: SyncState, 190 repoSlug: string, 191 etag?: string | null, 192): Promise<TeamMemorySyncFetchResult> { 193 try { 194 await checkAndRefreshOAuthTokenIfNeeded() 195 196 const auth = getAuthHeaders() 197 if (auth.error) { 198 return { 199 success: false, 200 error: auth.error, 201 skipRetry: true, 202 errorType: 'auth', 203 } 204 } 205 206 const headers: Record<string, string> = { ...auth.headers } 207 if (etag) { 208 headers['If-None-Match'] = `"${etag.replace(/"/g, '')}"` 209 } 210 211 const endpoint = getTeamMemorySyncEndpoint(repoSlug) 212 const response = await axios.get(endpoint, { 213 headers, 214 timeout: TEAM_MEMORY_SYNC_TIMEOUT_MS, 215 validateStatus: status => 216 status === 200 || status === 304 || status === 404, 217 }) 218 219 if (response.status === 304) { 220 logForDebugging('team-memory-sync: not modified (304)', { 221 level: 'debug', 222 }) 223 return { success: true, notModified: true, checksum: etag ?? undefined } 224 } 225 226 if (response.status === 404) { 227 logForDebugging('team-memory-sync: no remote data (404)', { 228 level: 'debug', 229 }) 230 state.lastKnownChecksum = null 231 return { success: true, isEmpty: true } 232 } 233 234 const parsed = TeamMemoryDataSchema().safeParse(response.data) 235 if (!parsed.success) { 236 logForDebugging('team-memory-sync: invalid response format', { 237 level: 'warn', 238 }) 239 return { 240 success: false, 241 error: 'Invalid team memory response format', 242 skipRetry: true, 243 errorType: 'parse', 244 } 245 } 246 247 // Extract checksum from response data or ETag header 248 const responseChecksum = 249 parsed.data.checksum || 250 response.headers['etag']?.replace(/^"|"$/g, '') || 251 undefined 252 if (responseChecksum) { 253 state.lastKnownChecksum = responseChecksum 254 } 255 256 logForDebugging( 257 `team-memory-sync: fetched successfully (checksum: ${responseChecksum ?? 'none'})`, 258 { level: 'debug' }, 259 ) 260 return { 261 success: true, 262 data: parsed.data, 263 isEmpty: false, 264 checksum: responseChecksum, 265 } 266 } catch (error) { 267 const { kind, status, message } = classifyAxiosError(error) 268 const body = axios.isAxiosError(error) 269 ? JSON.stringify(error.response?.data ?? '') 270 : '' 271 if (kind !== 'other') { 272 logForDebugging(`team-memory-sync: fetch error ${status}: ${body}`, { 273 level: 'warn', 274 }) 275 } 276 switch (kind) { 277 case 'auth': 278 return { 279 success: false, 280 error: `Not authorized for team memory sync: ${body}`, 281 skipRetry: true, 282 errorType: 'auth', 283 httpStatus: status, 284 } 285 case 'timeout': 286 return { 287 success: false, 288 error: 'Team memory sync request timeout', 289 errorType: 'timeout', 290 } 291 case 'network': 292 return { 293 success: false, 294 error: 'Cannot connect to server', 295 errorType: 'network', 296 } 297 default: 298 return { 299 success: false, 300 error: message, 301 errorType: 'unknown', 302 httpStatus: status, 303 } 304 } 305 } 306} 307 308/** 309 * Fetch only per-key checksums + metadata (no entry bodies). 310 * Used for cheap serverChecksums refresh during 412 conflict resolution — avoids 311 * downloading ~300KB of content just to learn which keys changed. 312 * Requires anthropic/anthropic#283027 deployed; on failure the caller fails the 313 * push and the watcher retries on the next edit. 314 */ 315async function fetchTeamMemoryHashes( 316 state: SyncState, 317 repoSlug: string, 318): Promise<TeamMemoryHashesResult> { 319 try { 320 await checkAndRefreshOAuthTokenIfNeeded() 321 const auth = getAuthHeaders() 322 if (auth.error) { 323 return { success: false, error: auth.error, errorType: 'auth' } 324 } 325 326 const endpoint = getTeamMemorySyncEndpoint(repoSlug) + '&view=hashes' 327 const response = await axios.get(endpoint, { 328 headers: auth.headers, 329 timeout: TEAM_MEMORY_SYNC_TIMEOUT_MS, 330 validateStatus: status => status === 200 || status === 404, 331 }) 332 333 if (response.status === 404) { 334 state.lastKnownChecksum = null 335 return { success: true, entryChecksums: {} } 336 } 337 338 const checksum = 339 response.data?.checksum || response.headers['etag']?.replace(/^"|"$/g, '') 340 const entryChecksums = response.data?.entryChecksums 341 342 // Requires anthropic/anthropic#283027. If entryChecksums is missing, 343 // treat as a probe failure — caller fails the push; watcher retries. 344 if (!entryChecksums || typeof entryChecksums !== 'object') { 345 return { 346 success: false, 347 error: 348 'Server did not return entryChecksums (?view=hashes unsupported)', 349 errorType: 'parse', 350 } 351 } 352 353 if (checksum) { 354 state.lastKnownChecksum = checksum 355 } 356 return { 357 success: true, 358 version: response.data?.version, 359 checksum, 360 entryChecksums, 361 } 362 } catch (error) { 363 const { kind, status, message } = classifyAxiosError(error) 364 switch (kind) { 365 case 'auth': 366 return { 367 success: false, 368 error: 'Not authorized', 369 errorType: 'auth', 370 httpStatus: status, 371 } 372 case 'timeout': 373 return { success: false, error: 'Timeout', errorType: 'timeout' } 374 case 'network': 375 return { success: false, error: 'Network error', errorType: 'network' } 376 default: 377 return { 378 success: false, 379 error: message, 380 errorType: 'unknown', 381 httpStatus: status, 382 } 383 } 384 } 385} 386 387async function fetchTeamMemory( 388 state: SyncState, 389 repoSlug: string, 390 etag?: string | null, 391): Promise<TeamMemorySyncFetchResult> { 392 let lastResult: TeamMemorySyncFetchResult | null = null 393 394 for (let attempt = 1; attempt <= MAX_RETRIES + 1; attempt++) { 395 lastResult = await fetchTeamMemoryOnce(state, repoSlug, etag) 396 if (lastResult.success || lastResult.skipRetry) { 397 return lastResult 398 } 399 if (attempt > MAX_RETRIES) { 400 return lastResult 401 } 402 const delayMs = getRetryDelay(attempt) 403 logForDebugging(`team-memory-sync: retry ${attempt}/${MAX_RETRIES}`, { 404 level: 'debug', 405 }) 406 await sleep(delayMs) 407 } 408 409 return lastResult! 410} 411 412// ─── Upload (push) ─────────────────────────────────────────── 413 414/** 415 * Split a delta into PUT-sized batches under MAX_PUT_BODY_BYTES each. 416 * 417 * Greedy bin-packing over sorted keys — sorting gives deterministic batches 418 * across calls, which matters for ETag stability if the conflict loop retries 419 * after a partial commit. The byte count is the full serialized body 420 * including JSON overhead, so what we measure is what axios sends. 421 * 422 * A single entry exceeding MAX_PUT_BODY_BYTES goes into its own solo batch 423 * (MAX_FILE_SIZE_BYTES=250K already caps individual files; a ~250K solo body 424 * is above our soft cap but below the gateway's observed real threshold). 425 */ 426export function batchDeltaByBytes( 427 delta: Record<string, string>, 428): Array<Record<string, string>> { 429 const keys = Object.keys(delta).sort() 430 if (keys.length === 0) return [] 431 432 // Fixed overhead for `{"entries":{}}` — each entry then adds its marginal 433 // bytes. jsonStringify (≡ JSON.stringify under the hood) on the raw 434 // strings handles escaping so the count matches what axios serializes. 435 const EMPTY_BODY_BYTES = Buffer.byteLength('{"entries":{}}', 'utf8') 436 const entryBytes = (k: string, v: string): number => 437 Buffer.byteLength(jsonStringify(k), 'utf8') + 438 Buffer.byteLength(jsonStringify(v), 'utf8') + 439 2 // colon + comma (comma over-counts by 1 on the last entry; harmless slack) 440 441 const batches: Array<Record<string, string>> = [] 442 let current: Record<string, string> = {} 443 let currentBytes = EMPTY_BODY_BYTES 444 445 for (const key of keys) { 446 const added = entryBytes(key, delta[key]!) 447 if ( 448 currentBytes + added > MAX_PUT_BODY_BYTES && 449 Object.keys(current).length > 0 450 ) { 451 batches.push(current) 452 current = {} 453 currentBytes = EMPTY_BODY_BYTES 454 } 455 current[key] = delta[key]! 456 currentBytes += added 457 } 458 batches.push(current) 459 return batches 460} 461 462async function uploadTeamMemory( 463 state: SyncState, 464 repoSlug: string, 465 entries: Record<string, string>, 466 ifMatchChecksum?: string | null, 467): Promise<TeamMemorySyncUploadResult> { 468 try { 469 await checkAndRefreshOAuthTokenIfNeeded() 470 471 const auth = getAuthHeaders() 472 if (auth.error) { 473 return { success: false, error: auth.error, errorType: 'auth' } 474 } 475 476 const headers: Record<string, string> = { 477 ...auth.headers, 478 'Content-Type': 'application/json', 479 } 480 if (ifMatchChecksum) { 481 headers['If-Match'] = `"${ifMatchChecksum.replace(/"/g, '')}"` 482 } 483 484 const endpoint = getTeamMemorySyncEndpoint(repoSlug) 485 const response = await axios.put( 486 endpoint, 487 { entries }, 488 { 489 headers, 490 timeout: TEAM_MEMORY_SYNC_TIMEOUT_MS, 491 validateStatus: status => status === 200 || status === 412, 492 }, 493 ) 494 495 if (response.status === 412) { 496 logForDebugging('team-memory-sync: conflict (412 Precondition Failed)', { 497 level: 'info', 498 }) 499 return { success: false, conflict: true, error: 'ETag mismatch' } 500 } 501 502 const responseChecksum = response.data?.checksum 503 if (responseChecksum) { 504 state.lastKnownChecksum = responseChecksum 505 } 506 507 logForDebugging( 508 `team-memory-sync: uploaded ${Object.keys(entries).length} entries (checksum: ${responseChecksum ?? 'none'})`, 509 { level: 'debug' }, 510 ) 511 return { 512 success: true, 513 checksum: responseChecksum, 514 lastModified: response.data?.lastModified, 515 } 516 } catch (error) { 517 const body = axios.isAxiosError(error) 518 ? JSON.stringify(error.response?.data ?? '') 519 : '' 520 logForDebugging( 521 `team-memory-sync: upload failed: ${error instanceof Error ? error.message : ''} ${body}`, 522 { level: 'warn' }, 523 ) 524 const { kind, status: httpStatus, message } = classifyAxiosError(error) 525 const errorType = kind === 'http' || kind === 'other' ? 'unknown' : kind 526 let serverErrorCode: 'team_memory_too_many_entries' | undefined 527 let serverMaxEntries: number | undefined 528 let serverReceivedEntries: number | undefined 529 // Parse structured 413 (anthropic/anthropic#293258). The server's 530 // RequestTooLargeException includes error_code + extra_details with 531 // the effective max_entries (may be GB-tuned per-org). Cache it so 532 // the next push trims to the right value. 533 if (httpStatus === 413 && axios.isAxiosError(error)) { 534 const parsed = TeamMemoryTooManyEntriesSchema().safeParse( 535 error.response?.data, 536 ) 537 if (parsed.success) { 538 serverErrorCode = parsed.data.error.details.error_code 539 serverMaxEntries = parsed.data.error.details.max_entries 540 serverReceivedEntries = parsed.data.error.details.received_entries 541 } 542 } 543 return { 544 success: false, 545 error: message, 546 errorType, 547 httpStatus, 548 ...(serverErrorCode !== undefined && { serverErrorCode }), 549 ...(serverMaxEntries !== undefined && { serverMaxEntries }), 550 ...(serverReceivedEntries !== undefined && { serverReceivedEntries }), 551 } 552 } 553} 554 555// ─── Local file operations ─────────────────────────────────── 556 557/** 558 * Read all team memory files from the local directory into a flat key-value map. 559 * Keys are relative paths from the team memory directory. 560 * Empty files are included (content will be empty string). 561 * 562 * PSR M22174: Each file is scanned for credentials before inclusion 563 * using patterns from gitleaks. Files containing secrets are SKIPPED 564 * (not uploaded) and collected in skippedSecrets so the caller can 565 * warn the user. 566 */ 567async function readLocalTeamMemory(maxEntries: number | null): Promise<{ 568 entries: Record<string, string> 569 skippedSecrets: SkippedSecretFile[] 570}> { 571 const teamDir = getTeamMemPath() 572 const entries: Record<string, string> = {} 573 const skippedSecrets: SkippedSecretFile[] = [] 574 575 async function walkDir(dir: string): Promise<void> { 576 try { 577 const dirEntries = await readdir(dir, { withFileTypes: true }) 578 await Promise.all( 579 dirEntries.map(async entry => { 580 const fullPath = join(dir, entry.name) 581 if (entry.isDirectory()) { 582 await walkDir(fullPath) 583 } else if (entry.isFile()) { 584 try { 585 const stats = await stat(fullPath) 586 if (stats.size > MAX_FILE_SIZE_BYTES) { 587 logForDebugging( 588 `team-memory-sync: skipping oversized file ${entry.name} (${stats.size} > ${MAX_FILE_SIZE_BYTES} bytes)`, 589 { level: 'info' }, 590 ) 591 return 592 } 593 const content = await readFile(fullPath, 'utf8') 594 const relPath = relative(teamDir, fullPath).replaceAll('\\', '/') 595 596 // PSR M22174: scan for secrets BEFORE adding to the upload 597 // payload. If a secret is detected, skip this file entirely 598 // so it never leaves the machine. 599 const secretMatches = scanForSecrets(content) 600 if (secretMatches.length > 0) { 601 // Report only the first match per file — one secret is 602 // enough to skip the file and we don't want to log more 603 // than necessary about credential locations. 604 const firstMatch = secretMatches[0]! 605 skippedSecrets.push({ 606 path: relPath, 607 ruleId: firstMatch.ruleId, 608 label: firstMatch.label, 609 }) 610 logForDebugging( 611 `team-memory-sync: skipping "${relPath}" — detected ${firstMatch.label}`, 612 { level: 'warn' }, 613 ) 614 return 615 } 616 617 entries[relPath] = content 618 } catch { 619 // Skip unreadable files 620 } 621 } 622 }), 623 ) 624 } catch (e) { 625 if (isErrnoException(e)) { 626 if (e.code !== 'ENOENT' && e.code !== 'EACCES' && e.code !== 'EPERM') { 627 throw e 628 } 629 } else { 630 throw e 631 } 632 } 633 } 634 635 await walkDir(teamDir) 636 637 // Truncate only if we've LEARNED a cap from the server (via a structured 638 // 413's extra_details.max_entries — anthropic/anthropic#293258). The 639 // server's entry-count cap is GB-tunable per-org via 640 // claude_code_team_memory_limits; we have no way to know it in advance. 641 // Before the first 413 we send everything and let the server be 642 // authoritative. The server validates total stored entries after merge 643 // (not PUT body count) and rejects atomically — nothing is written on 413. 644 // 645 // Sorting before truncation is what makes delta computation work: without 646 // it, the parallel walk above picks a different N-of-M subset each push 647 // (Promise.all resolves in completion order), serverChecksums misses keys, 648 // and the "delta" balloons to near-full snapshot. With deterministic 649 // truncation, the same N keys are compared against the same server state. 650 // 651 // When disk has more files than the learned cap, alphabetically-last ones 652 // consistently never sync. When the merged (server + delta) count exceeds 653 // the cap we still fail — recovering requires soft_delete_keys. 654 const keys = Object.keys(entries).sort() 655 if (maxEntries !== null && keys.length > maxEntries) { 656 const dropped = keys.slice(maxEntries) 657 logForDebugging( 658 `team-memory-sync: ${keys.length} local entries exceeds server cap of ${maxEntries}; ${dropped.length} file(s) will NOT sync: ${dropped.join(', ')}. Consider consolidating or removing some team memory files.`, 659 { level: 'warn' }, 660 ) 661 logEvent('tengu_team_mem_entries_capped', { 662 total_entries: keys.length, 663 dropped_count: dropped.length, 664 max_entries: maxEntries, 665 }) 666 const truncated: Record<string, string> = {} 667 for (const key of keys.slice(0, maxEntries)) { 668 truncated[key] = entries[key]! 669 } 670 return { entries: truncated, skippedSecrets } 671 } 672 return { entries, skippedSecrets } 673} 674 675/** 676 * Write remote team memory entries to the local directory. 677 * Validates every path against the team memory directory boundary. 678 * Skips entries whose on-disk content already matches, so unchanged 679 * files keep their mtime and don't spuriously invalidate the 680 * getMemoryFiles cache or trigger watcher events. 681 * 682 * Parallel: each entry is processed independently (validate + read-compare 683 * + mkdir + write). Concurrent mkdir on a shared parent is safe with 684 * recursive: true (EEXIST is swallowed). The initial pull is the long 685 * pole in startTeamMemoryWatcher — p99 was ~22s serial at 50 entries. 686 * 687 * Returns the number of files actually written. 688 */ 689async function writeRemoteEntriesToLocal( 690 entries: Record<string, string>, 691): Promise<number> { 692 const results = await Promise.all( 693 Object.entries(entries).map(async ([relPath, content]) => { 694 let validatedPath: string 695 try { 696 validatedPath = await validateTeamMemKey(relPath) 697 } catch (e) { 698 if (e instanceof PathTraversalError) { 699 logForDebugging(`team-memory-sync: ${e.message}`, { level: 'warn' }) 700 return false 701 } 702 throw e 703 } 704 705 const sizeBytes = Buffer.byteLength(content, 'utf8') 706 if (sizeBytes > MAX_FILE_SIZE_BYTES) { 707 logForDebugging( 708 `team-memory-sync: skipping oversized remote entry "${relPath}"`, 709 { level: 'info' }, 710 ) 711 return false 712 } 713 714 // Skip if on-disk content already matches. Handles the common case 715 // where pull returns unchanged entries (skipEtagCache path, first 716 // pull of a session with warm disk state from prior session). 717 try { 718 const existing = await readFile(validatedPath, 'utf8') 719 if (existing === content) { 720 return false 721 } 722 } catch (e) { 723 if ( 724 isErrnoException(e) && 725 e.code !== 'ENOENT' && 726 e.code !== 'ENOTDIR' 727 ) { 728 logForDebugging( 729 `team-memory-sync: unexpected read error for "${relPath}": ${e.code}`, 730 { level: 'debug' }, 731 ) 732 } 733 // Fall through to write for ENOENT/ENOTDIR (file doesn't exist yet) 734 } 735 736 try { 737 const parentDir = validatedPath.substring( 738 0, 739 validatedPath.lastIndexOf(sep), 740 ) 741 await mkdir(parentDir, { recursive: true }) 742 await writeFile(validatedPath, content, 'utf8') 743 return true 744 } catch (e) { 745 logForDebugging( 746 `team-memory-sync: failed to write "${relPath}": ${e}`, 747 { level: 'warn' }, 748 ) 749 return false 750 } 751 }), 752 ) 753 754 return count(results, Boolean) 755} 756 757// ─── Public API ────────────────────────────────────────────── 758 759/** 760 * Check if team memory sync is available (requires first-party OAuth). 761 */ 762export function isTeamMemorySyncAvailable(): boolean { 763 return isUsingOAuth() 764} 765 766/** 767 * Pull team memory from the server and write to local directory. 768 * Returns true if any files were updated. 769 */ 770export async function pullTeamMemory( 771 state: SyncState, 772 options?: { skipEtagCache?: boolean }, 773): Promise<{ 774 success: boolean 775 filesWritten: number 776 /** Number of entries the server returned, regardless of whether they were written to disk. */ 777 entryCount: number 778 notModified?: boolean 779 error?: string 780}> { 781 const skipEtagCache = options?.skipEtagCache ?? false 782 const startTime = Date.now() 783 784 if (!isUsingOAuth()) { 785 logPull(startTime, { success: false, errorType: 'no_oauth' }) 786 return { 787 success: false, 788 filesWritten: 0, 789 entryCount: 0, 790 error: 'OAuth not available', 791 } 792 } 793 794 const repoSlug = await getGithubRepo() 795 if (!repoSlug) { 796 logPull(startTime, { success: false, errorType: 'no_repo' }) 797 return { 798 success: false, 799 filesWritten: 0, 800 entryCount: 0, 801 error: 'No git remote found', 802 } 803 } 804 805 const etag = skipEtagCache ? null : state.lastKnownChecksum 806 const result = await fetchTeamMemory(state, repoSlug, etag) 807 if (!result.success) { 808 logPull(startTime, { 809 success: false, 810 errorType: result.errorType, 811 status: result.httpStatus, 812 }) 813 return { 814 success: false, 815 filesWritten: 0, 816 entryCount: 0, 817 error: result.error, 818 } 819 } 820 if (result.notModified) { 821 logPull(startTime, { success: true, notModified: true }) 822 return { success: true, filesWritten: 0, entryCount: 0, notModified: true } 823 } 824 if (result.isEmpty || !result.data) { 825 // Server has no data — clear stale serverChecksums so the next push 826 // doesn't skip entries it thinks the server already has. 827 state.serverChecksums.clear() 828 logPull(startTime, { success: true }) 829 return { success: true, filesWritten: 0, entryCount: 0 } 830 } 831 832 const entries = result.data.content.entries 833 const responseChecksums = result.data.content.entryChecksums 834 835 // Refresh serverChecksums from server-provided per-key hashes. 836 // Requires anthropic/anthropic#283027 — if the response lacks entryChecksums 837 // (pre-deploy server), serverChecksums stays empty and the next push uploads 838 // everything; it self-corrects on push success. 839 state.serverChecksums.clear() 840 if (responseChecksums) { 841 for (const [key, hash] of Object.entries(responseChecksums)) { 842 state.serverChecksums.set(key, hash) 843 } 844 } else { 845 logForDebugging( 846 'team-memory-sync: server response missing entryChecksums (pre-#283027 deploy) — next push will be full, not delta', 847 { level: 'debug' }, 848 ) 849 } 850 851 const filesWritten = await writeRemoteEntriesToLocal(entries) 852 if (filesWritten > 0) { 853 const { clearMemoryFileCaches } = await import('../../utils/claudemd.js') 854 clearMemoryFileCaches() 855 } 856 logForDebugging(`team-memory-sync: pulled ${filesWritten} files`, { 857 level: 'info', 858 }) 859 860 logPull(startTime, { success: true, filesWritten }) 861 862 return { 863 success: true, 864 filesWritten, 865 entryCount: Object.keys(entries).length, 866 } 867} 868 869/** 870 * Push local team memory files to the server with optimistic locking. 871 * 872 * Uses delta upload: only keys whose local content hash differs from 873 * serverChecksums are included in the PUT. On 412 conflict, probes 874 * GET ?view=hashes to refresh serverChecksums, recomputes the delta 875 * (naturally excluding keys where a teammate's push matches ours), 876 * and retries. No merge, no disk writes — server-only new keys from 877 * a teammate's concurrent push propagate on the next pull. 878 * 879 * Local-wins-on-conflict is the opposite of syncTeamMemory's pull-first 880 * semantics. This is intentional: pushTeamMemory is triggered by a local edit, 881 * and that edit must not be silently discarded just because a teammate pushed 882 * in the meantime. Content-level merge (same key, both changed) is not 883 * attempted — the local version simply overwrites the server version for that 884 * key, and the server's edit to that key is lost. This is the lesser evil: 885 * the local user is actively editing and can re-incorporate the teammate's 886 * changes, whereas silently discarding the local edit loses work the user 887 * just did with no recourse. 888 */ 889export async function pushTeamMemory( 890 state: SyncState, 891): Promise<TeamMemorySyncPushResult> { 892 const startTime = Date.now() 893 let conflictRetries = 0 894 895 if (!isUsingOAuth()) { 896 logPush(startTime, { success: false, errorType: 'no_oauth' }) 897 return { 898 success: false, 899 filesUploaded: 0, 900 error: 'OAuth not available', 901 errorType: 'no_oauth', 902 } 903 } 904 905 const repoSlug = await getGithubRepo() 906 if (!repoSlug) { 907 logPush(startTime, { success: false, errorType: 'no_repo' }) 908 return { 909 success: false, 910 filesUploaded: 0, 911 error: 'No git remote found', 912 errorType: 'no_repo', 913 } 914 } 915 916 // Read local entries once at the start. Conflict resolution does NOT re-read 917 // from disk — the delta computation against a refreshed serverChecksums naturally 918 // excludes server-origin content, so the user's local edit cannot be clobbered. 919 // Secret scanning (PSR M22174) happens here once — files with detected 920 // secrets are excluded from the upload set. 921 const localRead = await readLocalTeamMemory(state.serverMaxEntries) 922 const entries = localRead.entries 923 const skippedSecrets = localRead.skippedSecrets 924 if (skippedSecrets.length > 0) { 925 // Log a user-visible warning listing which files were skipped and why. 926 // Don't block the push — just exclude those files. The secret VALUE is 927 // never logged, only the type label. 928 const summary = skippedSecrets 929 .map(s => `"${s.path}" (${s.label})`) 930 .join(', ') 931 logForDebugging( 932 `team-memory-sync: ${skippedSecrets.length} file(s) skipped due to detected secrets: ${summary}. Remove the secret(s) to enable sync for these files.`, 933 { level: 'warn' }, 934 ) 935 logEvent('tengu_team_mem_secret_skipped', { 936 file_count: skippedSecrets.length, 937 // Only log gitleaks rule IDs (not values, not paths — paths could 938 // leak repo structure). Comma-joined for compact single-field analytics. 939 rule_ids: skippedSecrets 940 .map(s => s.ruleId) 941 .join( 942 ',', 943 ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 944 }) 945 } 946 947 // Hash each local entry once. The loop recomputes the delta each iteration 948 // (serverChecksums may change after a 412 probe) but local hashes are stable. 949 const localHashes = new Map<string, string>() 950 for (const [key, content] of Object.entries(entries)) { 951 localHashes.set(key, hashContent(content)) 952 } 953 954 let sawConflict = false 955 956 for ( 957 let conflictAttempt = 0; 958 conflictAttempt <= MAX_CONFLICT_RETRIES; 959 conflictAttempt++ 960 ) { 961 // Delta: only upload keys whose content hash differs from what we believe 962 // the server holds. On first push after a fresh pull, this is exactly the 963 // user's local edits. After a 412 probe, matching hashes are excluded — 964 // server-origin content from a teammate's concurrent push is naturally 965 // dropped from the delta, so we never re-upload it. 966 const delta: Record<string, string> = {} 967 for (const [key, localHash] of localHashes) { 968 if (state.serverChecksums.get(key) !== localHash) { 969 delta[key] = entries[key]! 970 } 971 } 972 const deltaCount = Object.keys(delta).length 973 974 if (deltaCount === 0) { 975 // Nothing to upload. This is the expected fast path after a fresh pull 976 // with no local edits, and also the convergence point after a 412 where 977 // the teammate's push was a strict superset of ours. 978 logPush(startTime, { 979 success: true, 980 conflict: sawConflict, 981 conflictRetries, 982 }) 983 return { 984 success: true, 985 filesUploaded: 0, 986 ...(skippedSecrets.length > 0 && { skippedSecrets }), 987 } 988 } 989 990 // Split the delta into PUT-sized batches to stay under the gateway's 991 // body-size limit. Typical deltas (1-3 edited files) land in one batch; 992 // cold pushes with many files are where this earns its keep. Each batch 993 // is a complete PUT that upserts its keys independently — if batch N 994 // fails, batches 1..N-1 are already committed server-side. Updating 995 // serverChecksums after each success means the outer conflict-loop retry 996 // naturally resumes from the uncommitted tail (those keys still differ). 997 // state.lastKnownChecksum is updated inside uploadTeamMemory on each 998 // 200, so the ETag chain threads through the batches automatically. 999 const batches = batchDeltaByBytes(delta) 1000 let filesUploaded = 0 1001 let result: TeamMemorySyncUploadResult | undefined 1002 1003 for (const batch of batches) { 1004 result = await uploadTeamMemory( 1005 state, 1006 repoSlug, 1007 batch, 1008 state.lastKnownChecksum, 1009 ) 1010 if (!result.success) break 1011 1012 for (const key of Object.keys(batch)) { 1013 state.serverChecksums.set(key, localHashes.get(key)!) 1014 } 1015 filesUploaded += Object.keys(batch).length 1016 } 1017 // batches is non-empty (deltaCount > 0 guaranteed by the check above), 1018 // so the loop executed at least once. 1019 result = result! 1020 1021 if (result.success) { 1022 // Server-side delta propagation to disk (server-only new keys from a 1023 // teammate's concurrent push) happens on the next pull — we only 1024 // fetched hashes during conflict resolution, not bodies. 1025 logForDebugging( 1026 batches.length > 1 1027 ? `team-memory-sync: pushed ${filesUploaded} of ${localHashes.size} files in ${batches.length} batches` 1028 : `team-memory-sync: pushed ${filesUploaded} of ${localHashes.size} files (delta)`, 1029 { level: 'info' }, 1030 ) 1031 logPush(startTime, { 1032 success: true, 1033 filesUploaded, 1034 conflict: sawConflict, 1035 conflictRetries, 1036 putBatches: batches.length > 1 ? batches.length : undefined, 1037 }) 1038 return { 1039 success: true, 1040 filesUploaded, 1041 checksum: result.checksum, 1042 ...(skippedSecrets.length > 0 && { skippedSecrets }), 1043 } 1044 } 1045 1046 if (!result.conflict) { 1047 // If the server returned a structured 413 with its effective 1048 // max_entries (anthropic/anthropic#293258), cache it so the next push 1049 // trims to the right cap. The server may GB-tune this per-org. 1050 // This push still fails — re-trimming mid-push would require re-reading 1051 // local entries and re-computing the delta, and we'd need 1052 // soft_delete_keys to shrink below current server count anyway. 1053 if (result.serverMaxEntries !== undefined) { 1054 state.serverMaxEntries = result.serverMaxEntries 1055 logForDebugging( 1056 `team-memory-sync: learned server max_entries=${result.serverMaxEntries} from 413; next push will truncate to this`, 1057 { level: 'warn' }, 1058 ) 1059 } 1060 // filesUploaded may be nonzero if earlier batches committed before this 1061 // one failed. Those keys ARE on the server; the push is a failure 1062 // because it's incomplete, but we don't re-upload them on retry 1063 // (serverChecksums was updated). 1064 logPush(startTime, { 1065 success: false, 1066 filesUploaded, 1067 conflictRetries, 1068 putBatches: batches.length > 1 ? batches.length : undefined, 1069 errorType: result.errorType, 1070 status: result.httpStatus, 1071 // Datadog: filter @error_code:team_memory_too_many_entries to track 1072 // too-many-files rejections distinct from gateway/unstructured 413s 1073 errorCode: result.serverErrorCode, 1074 serverMaxEntries: result.serverMaxEntries, 1075 serverReceivedEntries: result.serverReceivedEntries, 1076 }) 1077 return { 1078 success: false, 1079 filesUploaded, 1080 error: result.error, 1081 errorType: result.errorType, 1082 httpStatus: result.httpStatus, 1083 } 1084 } 1085 1086 // 412 conflict — refresh serverChecksums and retry with a tighter delta. 1087 sawConflict = true 1088 if (conflictAttempt >= MAX_CONFLICT_RETRIES) { 1089 logForDebugging( 1090 `team-memory-sync: giving up after ${MAX_CONFLICT_RETRIES} conflict retries`, 1091 { level: 'warn' }, 1092 ) 1093 logPush(startTime, { 1094 success: false, 1095 conflict: true, 1096 conflictRetries, 1097 errorType: 'conflict', 1098 }) 1099 return { 1100 success: false, 1101 filesUploaded: 0, 1102 conflict: true, 1103 error: 'Conflict resolution failed after retries', 1104 } 1105 } 1106 1107 conflictRetries++ 1108 1109 logForDebugging( 1110 `team-memory-sync: conflict (412), probing server hashes (attempt ${conflictAttempt + 1}/${MAX_CONFLICT_RETRIES})`, 1111 { level: 'info' }, 1112 ) 1113 1114 // Cheap probe: fetch only per-key checksums, no entry bodies. Refreshes 1115 // serverChecksums so the next iteration's delta drops any keys a teammate just 1116 // pushed with identical content. 1117 const probe = await fetchTeamMemoryHashes(state, repoSlug) 1118 if (!probe.success || !probe.entryChecksums) { 1119 // Requires anthropic/anthropic#283027. A transient probe failure here is 1120 // fine: the push is failed and the watcher will retry on the next edit. 1121 logPush(startTime, { 1122 success: false, 1123 conflict: true, 1124 conflictRetries, 1125 errorType: 'conflict', 1126 }) 1127 return { 1128 success: false, 1129 filesUploaded: 0, 1130 conflict: true, 1131 error: `Conflict resolution hashes probe failed: ${probe.error}`, 1132 } 1133 } 1134 state.serverChecksums.clear() 1135 for (const [key, hash] of Object.entries(probe.entryChecksums)) { 1136 state.serverChecksums.set(key, hash) 1137 } 1138 } 1139 1140 logPush(startTime, { success: false, conflictRetries }) 1141 return { 1142 success: false, 1143 filesUploaded: 0, 1144 error: 'Unexpected end of conflict resolution loop', 1145 } 1146} 1147 1148/** 1149 * Bidirectional sync: pull from server, merge with local, push back. 1150 * Server entries take precedence on conflict (last-write-wins by the server). 1151 * Push uses conflict resolution (retries on 412) via pushTeamMemory. 1152 */ 1153export async function syncTeamMemory(state: SyncState): Promise<{ 1154 success: boolean 1155 filesPulled: number 1156 filesPushed: number 1157 error?: string 1158}> { 1159 // 1. Pull remote → local (skip ETag cache for full sync) 1160 const pullResult = await pullTeamMemory(state, { skipEtagCache: true }) 1161 if (!pullResult.success) { 1162 return { 1163 success: false, 1164 filesPulled: 0, 1165 filesPushed: 0, 1166 error: pullResult.error, 1167 } 1168 } 1169 1170 // 2. Push local → remote (with conflict resolution) 1171 const pushResult = await pushTeamMemory(state) 1172 if (!pushResult.success) { 1173 return { 1174 success: false, 1175 filesPulled: pullResult.filesWritten, 1176 filesPushed: 0, 1177 error: pushResult.error, 1178 } 1179 } 1180 1181 logForDebugging( 1182 `team-memory-sync: synced (pulled ${pullResult.filesWritten}, pushed ${pushResult.filesUploaded})`, 1183 { level: 'info' }, 1184 ) 1185 1186 return { 1187 success: true, 1188 filesPulled: pullResult.filesWritten, 1189 filesPushed: pushResult.filesUploaded, 1190 } 1191} 1192 1193// ─── Telemetry helpers ─────────────────────────────────────── 1194 1195function logPull( 1196 startTime: number, 1197 outcome: { 1198 success: boolean 1199 filesWritten?: number 1200 notModified?: boolean 1201 errorType?: string 1202 status?: number 1203 }, 1204): void { 1205 logEvent('tengu_team_mem_sync_pull', { 1206 success: outcome.success, 1207 files_written: outcome.filesWritten ?? 0, 1208 not_modified: outcome.notModified ?? false, 1209 duration_ms: Date.now() - startTime, 1210 ...(outcome.errorType && { 1211 errorType: 1212 outcome.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1213 }), 1214 ...(outcome.status && { status: outcome.status }), 1215 }) 1216} 1217 1218function logPush( 1219 startTime: number, 1220 outcome: { 1221 success: boolean 1222 filesUploaded?: number 1223 conflict?: boolean 1224 conflictRetries?: number 1225 errorType?: string 1226 status?: number 1227 putBatches?: number 1228 errorCode?: string 1229 serverMaxEntries?: number 1230 serverReceivedEntries?: number 1231 }, 1232): void { 1233 logEvent('tengu_team_mem_sync_push', { 1234 success: outcome.success, 1235 files_uploaded: outcome.filesUploaded ?? 0, 1236 conflict: outcome.conflict ?? false, 1237 conflict_retries: outcome.conflictRetries ?? 0, 1238 duration_ms: Date.now() - startTime, 1239 ...(outcome.errorType && { 1240 errorType: 1241 outcome.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1242 }), 1243 ...(outcome.status && { status: outcome.status }), 1244 ...(outcome.putBatches && { put_batches: outcome.putBatches }), 1245 ...(outcome.errorCode && { 1246 error_code: 1247 outcome.errorCode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS, 1248 }), 1249 ...(outcome.serverMaxEntries !== undefined && { 1250 server_max_entries: outcome.serverMaxEntries, 1251 }), 1252 ...(outcome.serverReceivedEntries !== undefined && { 1253 server_received_entries: outcome.serverReceivedEntries, 1254 }), 1255 }) 1256}