personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

providers: raise quota exhaustion before fallback

Classify Gemini CLI quota signals from stderr or stdout as QuotaExhaustedError, tear down the CLI process group on every exit path, and let provider generic handlers re-raise quota errors so talent fallback can handle them. Cortex now treats only terminal error events as stream-ending, allowing the pre-fallback quota event to be broadcast without closing the active fallback run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+314 -144
+37 -4
tests/test_cli_provider.py
··· 13 13 14 14 from think.providers.cli import ( 15 15 CLIRunner, 16 + QuotaExhaustedError, 16 17 ThinkingAggregator, 17 18 assemble_prompt, 18 19 build_cogitate_env, ··· 286 287 class TestCLIRunnerExitCode: 287 288 """Tests for CLIRunner handling of non-zero exit codes.""" 288 289 289 - def test_nonzero_exit_no_output_raises(self): 290 - """CLI exits with error and no result → RuntimeError with stderr.""" 290 + def test_quota_exhausted_stderr_raises_quota_error(self): 291 + """CLI quota stderr raises QuotaExhaustedError before generic exit handling.""" 291 292 events = [] 292 293 callback = JSONEventCallback(events.append) 293 294 aggregator = ThinkingAggregator(callback, model="test-model") 294 295 295 296 process = _make_process( 296 297 stdout_lines=[], 297 - stderr_lines=[b"TerminalQuotaError: quota exhausted\n"], 298 + stderr_lines=[ 299 + b'TerminalQuotaError: quota exhausted {"retryDelayMs": 120000}\n' 300 + ], 298 301 return_code=1, 299 302 ) 300 303 ··· 312 315 AsyncMock(return_value=process), 313 316 ), 314 317 patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 315 - pytest.raises(RuntimeError, match="quota exhausted"), 318 + pytest.raises(QuotaExhaustedError, match="quota exhausted") as exc_info, 316 319 ): 317 320 asyncio.run(runner.run()) 318 321 322 + assert exc_info.value.retry_delay_ms == 120000 319 323 # CLIRunner should NOT emit error events — that's the caller's job 320 324 error_events = [e for e in events if e.get("event") == "error"] 321 325 assert len(error_events) == 0 326 + 327 + def test_quota_exhausted_stdout_raises_quota_error(self): 328 + events = [] 329 + callback = JSONEventCallback(events.append) 330 + aggregator = ThinkingAggregator(callback, model="test-model") 331 + process = _make_process( 332 + stdout_lines=[b'{"error":"QUOTA_EXHAUSTED","retryDelayMs":42}\n'], 333 + stderr_lines=[], 334 + return_code=1, 335 + ) 336 + runner = CLIRunner( 337 + cmd=["fakecli", "--json"], 338 + prompt_text="test", 339 + translate=lambda _e, _a, _c: None, 340 + callback=callback, 341 + aggregator=aggregator, 342 + ) 343 + 344 + with ( 345 + patch( 346 + "think.providers.cli.asyncio.create_subprocess_exec", 347 + AsyncMock(return_value=process), 348 + ), 349 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 350 + pytest.raises(QuotaExhaustedError) as exc_info, 351 + ): 352 + asyncio.run(runner.run()) 353 + 354 + assert exc_info.value.retry_delay_ms == 42 322 355 323 356 def test_nonzero_exit_with_output_returns_result(self): 324 357 """CLI exits with error but produced output → return result + warning."""
+10 -4
think/cortex.py
··· 468 468 provider 469 469 ) 470 470 471 - # Handle finish or error event 472 - if event.get("event") in ["finish", "error"]: 471 + # Handle finish or terminal error event 472 + terminal_error = event.get("event") == "error" and event.get( 473 + "terminal", True 474 + ) 475 + if event.get("event") == "finish" or terminal_error: 473 476 # Check for output (only on finish) 474 477 if event.get("event") == "finish": 475 478 result = event.get("result", "") ··· 605 608 self.logger.warning(f"Failed to write stderr event: {e}") 606 609 607 610 def _has_finish_event(self, file_path: Path) -> bool: 608 - """Check if the JSONL file contains a finish or error event.""" 611 + """Check if the JSONL file contains a finish or terminal error event.""" 609 612 try: 610 613 with open(file_path, "r") as f: 611 614 for line in f: 612 615 try: 613 616 event = json.loads(line) 614 - if event.get("event") in ["finish", "error"]: 617 + terminal_error = event.get("event") == "error" and event.get( 618 + "terminal", True 619 + ) 620 + if event.get("event") == "finish" or terminal_error: 615 621 return True 616 622 except json.JSONDecodeError as exc: 617 623 self.logger.warning(
+3
think/providers/anthropic.py
··· 53 53 54 54 from .cli import ( 55 55 CLIRunner, 56 + QuotaExhaustedError, 56 57 ThinkingAggregator, 57 58 assemble_prompt, 58 59 build_cogitate_env, ··· 316 317 ) 317 318 318 319 return result 320 + except QuotaExhaustedError: 321 + raise 319 322 except Exception as exc: 320 323 callback.emit( 321 324 {
+239 -134
think/providers/cli.py
··· 17 17 import json 18 18 import logging 19 19 import os 20 + import re 20 21 import shutil 22 + import signal 21 23 from pathlib import Path 22 24 from typing import Any, Callable 23 25 ··· 28 30 29 31 _PROJECT_ROOT = Path(__file__).parent.parent.parent 30 32 _TIMEOUT_LOG_DIR: Path = Path("/tmp") 33 + 34 + _QUOTA_TOKENS = ("QUOTA_EXHAUSTED", "TerminalQuotaError") 35 + _RETRY_DELAY_RE = re.compile(r'"?retryDelayMs"?\s*[:=]\s*"?([0-9]+(?:\.[0-9]+)?)') 36 + 37 + 38 + class QuotaExhaustedError(Exception): 39 + """Raised when a provider CLI reports quota exhaustion.""" 40 + 41 + def __init__(self, message: str, retry_delay_ms: int | None = None) -> None: 42 + super().__init__(message) 43 + self.retry_delay_ms = retry_delay_ms 44 + 45 + 46 + def _quota_error_from_text(text: str) -> QuotaExhaustedError | None: 47 + if not any(token in text for token in _QUOTA_TOKENS): 48 + return None 49 + 50 + retry_delay_ms: int | None = None 51 + match = _RETRY_DELAY_RE.search(text) 52 + if match: 53 + retry_delay_ms = int(float(match.group(1))) 54 + 55 + message = text.strip() or "Provider quota exhausted" 56 + return QuotaExhaustedError(message, retry_delay_ms) 31 57 32 58 33 59 async def _drain_line(stream: asyncio.StreamReader) -> None: ··· 210 236 self.first_event_timeout = first_event_timeout 211 237 self._timed_out_waiting_for_first_event = False 212 238 self._already_retried_first_event: bool = False 239 + self._quota_error: QuotaExhaustedError | None = None 213 240 self.cli_session_id: str | None = None 214 241 215 242 async def run(self) -> str: ··· 231 258 232 259 LOG.info("Spawning CLI: %s (cwd=%s)", " ".join(self.cmd), self.cwd) 233 260 234 - process = await asyncio.create_subprocess_exec( 261 + self._quota_error = None 262 + process: asyncio.subprocess.Process | None = None 263 + stderr_task: asyncio.Task[None] | None = None 264 + stderr_lines: list[str] = [] 265 + self._timed_out_waiting_for_first_event = False 266 + 267 + try: 268 + process = await self._spawn_process(proc_env) 269 + self._send_prompt(process) 270 + stderr_task = asyncio.create_task( 271 + self._read_stderr(process, stderr_lines, binary) 272 + ) 273 + 274 + try: 275 + try: 276 + await asyncio.wait_for( 277 + self._process_stdout(process), 278 + timeout=self.timeout, 279 + ) 280 + except asyncio.TimeoutError: 281 + if ( 282 + self._timed_out_waiting_for_first_event 283 + and not self._already_retried_first_event 284 + ): 285 + LOG.warning( 286 + "CLI first-event timed out after %ss, retrying once", 287 + self.first_event_timeout, 288 + ) 289 + self._already_retried_first_event = True 290 + try: 291 + process.kill() 292 + except ProcessLookupError: 293 + pass 294 + await self._terminate_process_group(process) 295 + await stderr_task 296 + self._write_timeout_log( 297 + which_timeout="first_event", 298 + timeout_seconds=self.first_event_timeout, 299 + proc_env=proc_env, 300 + cmd=self.cmd, 301 + cwd=str(self.cwd), 302 + stderr_lines=stderr_lines, 303 + ) 304 + 305 + process = await self._spawn_process(proc_env) 306 + self._send_prompt(process) 307 + 308 + stderr_lines = [] 309 + stderr_task = asyncio.create_task( 310 + self._read_stderr(process, stderr_lines, binary) 311 + ) 312 + self._timed_out_waiting_for_first_event = False 313 + await asyncio.wait_for( 314 + self._process_stdout(process), 315 + timeout=self.timeout, 316 + ) 317 + else: 318 + raise 319 + except asyncio.TimeoutError: 320 + timeout_seconds = ( 321 + self.first_event_timeout 322 + if self._timed_out_waiting_for_first_event 323 + else self.timeout 324 + ) 325 + which_timeout = ( 326 + "first_event" 327 + if self._timed_out_waiting_for_first_event 328 + else "full_run" 329 + ) 330 + LOG.error("CLI process timed out after %ss, killing", timeout_seconds) 331 + try: 332 + process.kill() 333 + except ProcessLookupError: 334 + pass 335 + await self._terminate_process_group(process) 336 + await stderr_task 337 + self._write_timeout_log( 338 + which_timeout=which_timeout, 339 + timeout_seconds=timeout_seconds, 340 + proc_env=proc_env, 341 + cmd=self.cmd, 342 + cwd=str(self.cwd), 343 + stderr_lines=stderr_lines, 344 + ) 345 + stderr_tail = "\n".join(stderr_lines[-20:]) 346 + error_message = ( 347 + f"CLI process timed out after {timeout_seconds}s. " 348 + f"Stderr tail:\n{stderr_tail}\n" 349 + "Check that the CLI tool is installed and authenticated." 350 + ) 351 + self.callback.emit( 352 + { 353 + "event": "error", 354 + "error": error_message, 355 + "ts": now_ms(), 356 + } 357 + ) 358 + raise RuntimeError(error_message) 359 + 360 + if self._quota_error: 361 + raise self._quota_error 362 + 363 + await stderr_task 364 + if self._quota_error: 365 + raise self._quota_error 366 + 367 + return_code = await process.wait() 368 + result = self.aggregator.flush_as_result() 369 + 370 + if return_code != 0: 371 + stderr_text = "\n".join(stderr_lines[-20:]) # Last 20 lines 372 + if result: 373 + # CLI failed but produced output — warn and return what we got 374 + LOG.warning( 375 + "CLI process exited with code %d but produced output. Stderr: %s", 376 + return_code, 377 + stderr_text, 378 + ) 379 + self.callback.emit( 380 + { 381 + "event": "warning", 382 + "message": f"CLI exited with code {return_code}", 383 + "stderr": stderr_text, 384 + "ts": now_ms(), 385 + } 386 + ) 387 + else: 388 + # CLI failed with no output — this is an error. 389 + # Don't emit error event here; the caller's exception 390 + # handler is responsible for error event emission. 391 + LOG.error( 392 + "CLI process exited with code %d: %s", 393 + return_code, 394 + stderr_text, 395 + ) 396 + raise RuntimeError( 397 + f"CLI process exited with code {return_code}. Stderr: {stderr_text}" 398 + ) 399 + 400 + return result 401 + finally: 402 + if process and process.returncode is None: 403 + await self._terminate_process_group(process) 404 + if stderr_task: 405 + await stderr_task 406 + 407 + async def _spawn_process( 408 + self, proc_env: dict[str, str] 409 + ) -> asyncio.subprocess.Process: 410 + return await asyncio.create_subprocess_exec( 235 411 *self.cmd, 236 412 stdin=asyncio.subprocess.PIPE, 237 413 stdout=asyncio.subprocess.PIPE, 238 414 stderr=asyncio.subprocess.PIPE, 239 - limit=1024 * 1024, # 1 MB – tool results can exceed the 64 KB default 415 + limit=1024 * 1024, 240 416 cwd=str(self.cwd), 241 417 env=proc_env, 418 + start_new_session=True, 242 419 ) 243 420 244 - # Pipe prompt to stdin and close 421 + def _send_prompt(self, process: asyncio.subprocess.Process) -> None: 245 422 if process.stdin: 246 423 process.stdin.write(self.prompt_text.encode("utf-8")) 247 424 process.stdin.close() 248 425 249 - # Read stdout line by line, translate each JSONL event 250 - stderr_lines: list[str] = [] 251 - 252 - async def _read_stderr() -> None: 253 - if not process.stderr: 426 + async def _read_stderr( 427 + self, 428 + process: asyncio.subprocess.Process, 429 + stderr_lines: list[str], 430 + binary: str, 431 + ) -> None: 432 + if not process.stderr: 433 + return 434 + async for raw_line in process.stderr: 435 + line = raw_line.decode("utf-8", errors="replace").rstrip() 436 + if not line: 437 + continue 438 + stderr_lines.append(line) 439 + LOG.debug("[%s stderr] %s", binary, line) 440 + quota_error = _quota_error_from_text(line) 441 + if quota_error: 442 + self._quota_error = quota_error 443 + await self._terminate_process_group(process) 254 444 return 255 - async for raw_line in process.stderr: 256 - line = raw_line.decode("utf-8", errors="replace").rstrip() 257 - if line: 258 - stderr_lines.append(line) 259 - LOG.debug("[%s stderr] %s", binary, line) 260 445 261 - stderr_task = asyncio.create_task(_read_stderr()) 262 - self._timed_out_waiting_for_first_event = False 446 + async def _terminate_process_group( 447 + self, 448 + process: asyncio.subprocess.Process, 449 + *, 450 + grace_seconds: float = 2.0, 451 + ) -> None: 452 + if process.returncode is not None: 453 + return 263 454 264 455 try: 456 + pgid = os.getpgid(process.pid) 457 + except ProcessLookupError: 458 + pgid = None 459 + 460 + if pgid is not None: 265 461 try: 266 - await asyncio.wait_for( 267 - self._process_stdout(process), 268 - timeout=self.timeout, 269 - ) 270 - except asyncio.TimeoutError: 271 - if ( 272 - self._timed_out_waiting_for_first_event 273 - and not self._already_retried_first_event 274 - ): 275 - LOG.warning( 276 - "CLI first-event timed out after %ss, retrying once", 277 - self.first_event_timeout, 278 - ) 279 - self._already_retried_first_event = True 280 - process.kill() 281 - await stderr_task 282 - self._write_timeout_log( 283 - which_timeout="first_event", 284 - timeout_seconds=self.first_event_timeout, 285 - proc_env=proc_env, 286 - cmd=self.cmd, 287 - cwd=str(self.cwd), 288 - stderr_lines=stderr_lines, 289 - ) 462 + os.killpg(pgid, signal.SIGTERM) 463 + except ProcessLookupError: 464 + pass 465 + else: 466 + try: 467 + process.terminate() 468 + except ProcessLookupError: 469 + pass 290 470 291 - process = await asyncio.create_subprocess_exec( 292 - *self.cmd, 293 - stdin=asyncio.subprocess.PIPE, 294 - stdout=asyncio.subprocess.PIPE, 295 - stderr=asyncio.subprocess.PIPE, 296 - limit=1024 * 1024, 297 - cwd=str(self.cwd), 298 - env=proc_env, 299 - ) 471 + try: 472 + await asyncio.wait_for(process.wait(), timeout=grace_seconds) 473 + return 474 + except asyncio.TimeoutError: 475 + pass 300 476 301 - if process.stdin: 302 - process.stdin.write(self.prompt_text.encode("utf-8")) 303 - process.stdin.close() 304 - 305 - stderr_lines = [] 306 - stderr_task = asyncio.create_task(_read_stderr()) 307 - self._timed_out_waiting_for_first_event = False 308 - await asyncio.wait_for( 309 - self._process_stdout(process), 310 - timeout=self.timeout, 311 - ) 312 - else: 313 - raise 314 - except asyncio.TimeoutError: 315 - timeout_seconds = ( 316 - self.first_event_timeout 317 - if self._timed_out_waiting_for_first_event 318 - else self.timeout 319 - ) 320 - which_timeout = ( 321 - "first_event" if self._timed_out_waiting_for_first_event else "full_run" 322 - ) 323 - LOG.error("CLI process timed out after %ss, killing", timeout_seconds) 477 + if pgid is not None: 478 + try: 479 + os.killpg(pgid, signal.SIGKILL) 480 + except ProcessLookupError: 481 + pass 482 + try: 324 483 process.kill() 325 - await stderr_task 326 - self._write_timeout_log( 327 - which_timeout=which_timeout, 328 - timeout_seconds=timeout_seconds, 329 - proc_env=proc_env, 330 - cmd=self.cmd, 331 - cwd=str(self.cwd), 332 - stderr_lines=stderr_lines, 333 - ) 334 - stderr_tail = "\n".join(stderr_lines[-20:]) 335 - error_message = ( 336 - f"CLI process timed out after {timeout_seconds}s. " 337 - f"Stderr tail:\n{stderr_tail}\n" 338 - "Check that the CLI tool is installed and authenticated." 339 - ) 340 - self.callback.emit( 341 - { 342 - "event": "error", 343 - "error": error_message, 344 - "ts": now_ms(), 345 - } 346 - ) 347 - raise RuntimeError(error_message) 348 - finally: 349 - # Wait for stderr reader to finish 350 - if not stderr_task.done(): 351 - await stderr_task 352 - 353 - # Wait for process to exit 354 - return_code = await process.wait() 355 - result = self.aggregator.flush_as_result() 356 - 357 - if return_code != 0: 358 - stderr_text = "\n".join(stderr_lines[-20:]) # Last 20 lines 359 - if result: 360 - # CLI failed but produced output — warn and return what we got 361 - LOG.warning( 362 - "CLI process exited with code %d but produced output. Stderr: %s", 363 - return_code, 364 - stderr_text, 365 - ) 366 - self.callback.emit( 367 - { 368 - "event": "warning", 369 - "message": f"CLI exited with code {return_code}", 370 - "stderr": stderr_text, 371 - "ts": now_ms(), 372 - } 373 - ) 374 - else: 375 - # CLI failed with no output — this is an error. 376 - # Don't emit error event here; the caller's exception 377 - # handler is responsible for error event emission. 378 - LOG.error( 379 - "CLI process exited with code %d: %s", 380 - return_code, 381 - stderr_text, 382 - ) 383 - raise RuntimeError( 384 - f"CLI process exited with code {return_code}. Stderr: {stderr_text}" 385 - ) 386 - 387 - return result 484 + except ProcessLookupError: 485 + pass 486 + await process.wait() 388 487 389 488 async def _process_stdout(self, process: asyncio.subprocess.Process) -> None: 390 489 """Read and translate JSONL lines from stdout.""" ··· 395 494 line = raw_line.decode("utf-8", errors="replace").strip() 396 495 if not line: 397 496 return 497 + 498 + quota_error = _quota_error_from_text(line) 499 + if quota_error: 500 + self._quota_error = quota_error 501 + raise quota_error 398 502 399 503 try: 400 504 event_data = json.loads(line) ··· 630 734 631 735 __all__ = [ 632 736 "CLIRunner", 737 + "QuotaExhaustedError", 633 738 "ThinkingAggregator", 634 739 "assemble_prompt", 635 740 "build_cogitate_env",
+3
think/providers/google.py
··· 45 45 46 46 from .cli import ( 47 47 CLIRunner, 48 + QuotaExhaustedError, 48 49 ThinkingAggregator, 49 50 assemble_prompt, 50 51 build_cogitate_env, ··· 819 820 finish_event["cli_session_id"] = runner.cli_session_id 820 821 callback.emit(finish_event) 821 822 return result 823 + except QuotaExhaustedError: 824 + raise 822 825 except Exception as exc: 823 826 callback.emit( 824 827 {
+3
think/providers/openai.py
··· 43 43 from think.models import GPT_5, OPENAI_EFFORT_SUFFIXES 44 44 from think.providers.cli import ( 45 45 CLIRunner, 46 + QuotaExhaustedError, 46 47 ThinkingAggregator, 47 48 assemble_prompt, 48 49 build_cogitate_env, ··· 224 225 225 226 try: 226 227 result = await runner.run() 228 + except QuotaExhaustedError: 229 + raise 227 230 except Exception as exc: 228 231 if not getattr(exc, "_evented", False): 229 232 cb.emit(
+19 -2
think/talents.py
··· 28 28 from typing import Any, Callable, Optional 29 29 30 30 from think.cluster import cluster, cluster_period, cluster_span 31 + from think.providers.cli import QuotaExhaustedError 31 32 from think.providers.shared import Event 32 33 from think.talent import ( 33 34 get_output_path, ··· 809 810 FileNotFoundError, 810 811 PermissionError, 811 812 NotImplementedError, 813 + QuotaExhaustedError, 812 814 ) 813 815 814 816 ··· 819 821 Returns True for everything else (SDK connection, timeout, server errors). 820 822 """ 821 823 return not isinstance(exc, _NON_RETRYABLE_ERRORS) 824 + 825 + 826 + def _should_fallback(exc: Exception) -> bool: 827 + return _is_retryable_error(exc) or isinstance(exc, QuotaExhaustedError) 822 828 823 829 824 830 async def _execute_with_tools( ··· 861 867 try: 862 868 await provider_mod.run_cogitate(config=config, on_event=talent_emit_event) 863 869 except Exception as exc: 864 - if not _is_retryable_error(exc) or config.get("fallback_from"): 870 + if config.get("fallback_from") or not _should_fallback(exc): 865 871 raise 872 + if isinstance(exc, QuotaExhaustedError): 873 + reset_at_ms = now_ms() + (exc.retry_delay_ms or 0) 874 + emit_event( 875 + { 876 + "event": "error", 877 + "reason": "quota_exhausted", 878 + "error": str(exc), 879 + "reset_at_ms": reset_at_ms, 880 + "terminal": False, 881 + } 882 + ) 866 883 from think.models import ( 867 884 get_backup_provider, 868 885 resolve_model_for_provider, ··· 986 1003 timeout_s=timeout_s, 987 1004 ) 988 1005 except Exception as exc: 989 - if not _is_retryable_error(exc) or config.get("fallback_from"): 1006 + if config.get("fallback_from") or not _should_fallback(exc): 990 1007 raise 991 1008 from think.models import ( 992 1009 get_backup_provider,