personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cogitate: retry first-event CLI timeouts, bump to 90s

Bump CLIRunner's default first_event_timeout from 30s to 90s.
Healthy gemini CLI pre-init already takes 5-10s, and 90s leaves
headroom for one transient retry/backoff before giving up.
Rationale follows the CTO investigation in
~/projects/extro/cto/workspace/gemini-cogitate-hang-investigation-260426.md.

Retry exactly once on first-event timeout, gated by
_already_retried_first_event. Mid-stream (full-run) timeouts are not
retried; the existing 600s timeout continues to cover that case.

Write /tmp/gemini-cogitate-timeout-<unix_ms>.log on every timeout:
pre-retry, final give-up, and full-run. Each log records cmd, cwd,
sorted env keys without values, which_timeout, and captured stderr,
with mode 0o600. Write failures are logged via LOG.warning and
swallowed so they never mask the underlying RuntimeError. Add the
module-scope _TIMEOUT_LOG_DIR seam so tests can monkeypatch the path.

Keep scope confined to CLIRunner in think/providers/cli.py and its
tests. build_cogitate_env, assemble_prompt, ThinkingAggregator, and
provider-side translate functions are intentionally untouched per the
CTO investigation scope.

Keep the give-up surface unchanged: the emitted error event payload and
the RuntimeError(error_message) text remain byte-identical to prior
behavior.

+362 -13
+259 -5
tests/test_cli_provider.py
··· 242 242 return process 243 243 244 244 245 + class HangingStdout: 246 + async def readline(self): 247 + future = asyncio.get_running_loop().create_future() 248 + return await future 249 + 250 + 251 + class _DelayedStdout: 252 + """Stdout mock that waits before yielding each line.""" 253 + 254 + def __init__(self, lines: list[bytes], delay_seconds: float): 255 + self._lines = lines 256 + self._delay_seconds = delay_seconds 257 + self._index = 0 258 + 259 + async def readline(self): 260 + await asyncio.sleep(self._delay_seconds) 261 + if self._index >= len(self._lines): 262 + return b"" 263 + line = self._lines[self._index] 264 + self._index += 1 265 + return line 266 + 267 + 268 + class _FirstEmitThenHangStdout: 269 + """Stdout mock that emits one line and then hangs forever.""" 270 + 271 + def __init__(self, first_line: bytes, delay_seconds: float = 0.0): 272 + self._first_line = first_line 273 + self._delay_seconds = delay_seconds 274 + self._emitted = False 275 + 276 + async def readline(self): 277 + if not self._emitted: 278 + self._emitted = True 279 + if self._delay_seconds: 280 + await asyncio.sleep(self._delay_seconds) 281 + return self._first_line 282 + future = asyncio.get_running_loop().create_future() 283 + return await future 284 + 285 + 245 286 class TestCLIRunnerExitCode: 246 287 """Tests for CLIRunner handling of non-zero exit codes.""" 247 288 ··· 431 472 callback = JSONEventCallback(events.append) 432 473 aggregator = ThinkingAggregator(callback, model="test-model") 433 474 434 - class HangingStdout: 435 - async def readline(self): 436 - future = asyncio.get_running_loop().create_future() 437 - return await future 438 - 439 475 process = _make_process([], [b"Please authenticate first\n"], 0) 440 476 process.stdout = HangingStdout() # Override with hanging version 441 477 ··· 448 484 timeout=5, 449 485 first_event_timeout=0.1, 450 486 ) 487 + # Force single-shot behavior to keep this test focused on the give-up 488 + # surface; the new retry contract is covered in 489 + # TestCLIRunnerFirstEventRetry. 490 + runner._already_retried_first_event = True 451 491 452 492 with ( 453 493 patch( ··· 466 506 error_events = [event for event in events if event.get("event") == "error"] 467 507 assert len(error_events) == 1 468 508 assert "Please authenticate first" in error_events[0]["error"] 509 + 510 + 511 + class TestCLIRunnerFirstEventRetry: 512 + @staticmethod 513 + def _translate_text_event(event, agg, cb): 514 + if event.get("type") == "text": 515 + agg.accumulate(event["content"]) 516 + cb.emit({"event": "text", "text": event["content"]}) 517 + return None 518 + 519 + def test_short_first_event_timeout_with_slow_first_emit_raises( 520 + self, monkeypatch, tmp_path 521 + ): 522 + monkeypatch.setattr("think.providers.cli._TIMEOUT_LOG_DIR", tmp_path) 523 + events = [] 524 + callback = JSONEventCallback(events.append) 525 + aggregator = ThinkingAggregator(callback, model="test-model") 526 + slow_line = b'{"type": "text", "content": "slow"}\n' 527 + process_one = _make_process([], [], 0) 528 + process_one.stdout = _DelayedStdout([slow_line], delay_seconds=0.5) 529 + process_two = _make_process([], [], 0) 530 + process_two.stdout = _DelayedStdout([slow_line], delay_seconds=0.5) 531 + runner = CLIRunner( 532 + cmd=["fakecli", "--json"], 533 + prompt_text="test", 534 + translate=self._translate_text_event, 535 + callback=callback, 536 + aggregator=aggregator, 537 + timeout=5, 538 + first_event_timeout=0.05, 539 + ) 540 + 541 + with ( 542 + patch( 543 + "think.providers.cli.asyncio.create_subprocess_exec", 544 + AsyncMock(side_effect=[process_one, process_two]), 545 + ) as mock_create, 546 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 547 + pytest.raises(RuntimeError), 548 + ): 549 + asyncio.run(runner.run()) 550 + 551 + assert mock_create.call_count == 2 552 + 553 + def test_first_event_timeout_with_headroom_succeeds(self, monkeypatch, tmp_path): 554 + monkeypatch.setattr("think.providers.cli._TIMEOUT_LOG_DIR", tmp_path) 555 + events = [] 556 + callback = JSONEventCallback(events.append) 557 + aggregator = ThinkingAggregator(callback, model="test-model") 558 + line = b'{"type": "text", "content": "headroom"}\n' 559 + process = _make_process([], [], 0) 560 + process.stdout = _DelayedStdout([line], delay_seconds=0.05) 561 + runner = CLIRunner( 562 + cmd=["fakecli", "--json"], 563 + prompt_text="test", 564 + translate=self._translate_text_event, 565 + callback=callback, 566 + aggregator=aggregator, 567 + timeout=5, 568 + first_event_timeout=1.0, 569 + ) 570 + 571 + with ( 572 + patch( 573 + "think.providers.cli.asyncio.create_subprocess_exec", 574 + AsyncMock(return_value=process), 575 + ) as mock_create, 576 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 577 + ): 578 + result = asyncio.run(runner.run()) 579 + 580 + assert result == "headroom" 581 + assert mock_create.call_count == 1 582 + assert runner._already_retried_first_event is False 583 + 584 + def test_first_event_timeout_triggers_one_retry(self, monkeypatch, tmp_path): 585 + monkeypatch.setattr("think.providers.cli._TIMEOUT_LOG_DIR", tmp_path) 586 + events = [] 587 + callback = JSONEventCallback(events.append) 588 + aggregator = ThinkingAggregator(callback, model="test-model") 589 + hanging_proc_1 = _make_process([], [], 0) 590 + hanging_proc_1.stdout = HangingStdout() 591 + hanging_proc_2 = _make_process([], [], 0) 592 + hanging_proc_2.stdout = HangingStdout() 593 + runner = CLIRunner( 594 + cmd=["fakecli", "--json"], 595 + prompt_text="test", 596 + translate=self._translate_text_event, 597 + callback=callback, 598 + aggregator=aggregator, 599 + timeout=5, 600 + first_event_timeout=0.05, 601 + ) 602 + 603 + with ( 604 + patch( 605 + "think.providers.cli.asyncio.create_subprocess_exec", 606 + AsyncMock(side_effect=[hanging_proc_1, hanging_proc_2]), 607 + ) as mock_create, 608 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 609 + pytest.raises(RuntimeError), 610 + ): 611 + asyncio.run(runner.run()) 612 + 613 + assert mock_create.call_count == 2 614 + assert runner._already_retried_first_event is True 615 + 616 + def test_retry_succeeds_when_second_spawn_emits(self, monkeypatch, tmp_path): 617 + monkeypatch.setattr("think.providers.cli._TIMEOUT_LOG_DIR", tmp_path) 618 + events = [] 619 + callback = JSONEventCallback(events.append) 620 + aggregator = ThinkingAggregator(callback, model="test-model") 621 + hanging_proc = _make_process([], [], 0) 622 + hanging_proc.stdout = HangingStdout() 623 + healthy_proc = _make_process([], [], 0) 624 + healthy_proc.stdout = _DelayedStdout( 625 + [b'{"type": "text", "content": "retry ok"}\n'], 626 + delay_seconds=0.01, 627 + ) 628 + runner = CLIRunner( 629 + cmd=["fakecli", "--json"], 630 + prompt_text="test", 631 + translate=self._translate_text_event, 632 + callback=callback, 633 + aggregator=aggregator, 634 + timeout=5, 635 + first_event_timeout=0.05, 636 + ) 637 + 638 + with ( 639 + patch( 640 + "think.providers.cli.asyncio.create_subprocess_exec", 641 + AsyncMock(side_effect=[hanging_proc, healthy_proc]), 642 + ) as mock_create, 643 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 644 + ): 645 + result = asyncio.run(runner.run()) 646 + 647 + assert result == "retry ok" 648 + assert mock_create.call_count == 2 649 + assert runner._already_retried_first_event is True 650 + assert [event for event in events if event.get("event") == "text"] 651 + 652 + def test_timeout_log_redacts_env_values_and_prompt(self, monkeypatch, tmp_path): 653 + monkeypatch.setattr("think.providers.cli._TIMEOUT_LOG_DIR", tmp_path) 654 + events = [] 655 + callback = JSONEventCallback(events.append) 656 + aggregator = ThinkingAggregator(callback, model="test-model") 657 + hanging_proc_1 = _make_process([], [], 0) 658 + hanging_proc_1.stdout = HangingStdout() 659 + hanging_proc_2 = _make_process([], [], 0) 660 + hanging_proc_2.stdout = HangingStdout() 661 + runner = CLIRunner( 662 + cmd=["fakecli", "--json"], 663 + prompt_text="prompt-do-not-leak-67890", 664 + translate=self._translate_text_event, 665 + callback=callback, 666 + aggregator=aggregator, 667 + env={"FAKE_KEY": "do-not-leak-me-12345"}, 668 + timeout=5, 669 + first_event_timeout=0.05, 670 + ) 671 + 672 + with ( 673 + patch( 674 + "think.providers.cli.asyncio.create_subprocess_exec", 675 + AsyncMock(side_effect=[hanging_proc_1, hanging_proc_2]), 676 + ), 677 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 678 + pytest.raises(RuntimeError), 679 + ): 680 + asyncio.run(runner.run()) 681 + 682 + files = list(tmp_path.glob("gemini-cogitate-timeout-*.log")) 683 + assert len(files) == 2 684 + for file_path in files: 685 + content = file_path.read_text() 686 + assert "FAKE_KEY" in content 687 + assert "do-not-leak-me-12345" not in content 688 + assert "prompt-do-not-leak-67890" not in content 689 + assert file_path.stat().st_mode & 0o777 == 0o600 690 + 691 + def test_full_run_timeout_writes_log(self, monkeypatch, tmp_path): 692 + monkeypatch.setattr("think.providers.cli._TIMEOUT_LOG_DIR", tmp_path) 693 + events = [] 694 + callback = JSONEventCallback(events.append) 695 + aggregator = ThinkingAggregator(callback, model="test-model") 696 + process = _make_process([], [], 0) 697 + process.stdout = _FirstEmitThenHangStdout( 698 + b'{"type": "text", "content": "first line"}\n' 699 + ) 700 + runner = CLIRunner( 701 + cmd=["fakecli", "--json"], 702 + prompt_text="test", 703 + translate=self._translate_text_event, 704 + callback=callback, 705 + aggregator=aggregator, 706 + timeout=0.05, 707 + first_event_timeout=1.0, 708 + ) 709 + 710 + with ( 711 + patch( 712 + "think.providers.cli.asyncio.create_subprocess_exec", 713 + AsyncMock(return_value=process), 714 + ), 715 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 716 + pytest.raises(RuntimeError), 717 + ): 718 + asyncio.run(runner.run()) 719 + 720 + files = list(tmp_path.glob("gemini-cogitate-timeout-*.log")) 721 + assert len(files) == 1 722 + assert "which_timeout: full_run" in files[0].read_text() 469 723 470 724 471 725 _OVERSIZE = object() # sentinel for oversize line in _MockStdoutWithOversize
+103 -8
think/providers/cli.py
··· 27 27 LOG = logging.getLogger("think.providers.cli") 28 28 29 29 _PROJECT_ROOT = Path(__file__).parent.parent.parent 30 + _TIMEOUT_LOG_DIR: Path = Path("/tmp") 30 31 31 32 32 33 async def _drain_line(stream: asyncio.StreamReader) -> None: ··· 180 181 cwd: Working directory for the subprocess. Defaults to project root. 181 182 env: Optional complete environment for the subprocess (used as-is, not merged). When None, inherits os.environ. 182 183 timeout: Subprocess timeout in seconds. Default 600. 183 - first_event_timeout: Timeout for first stdout line in seconds. Default 30. 184 + first_event_timeout: Timeout for first stdout line in seconds. Default 90. 184 185 """ 185 186 186 187 def __init__( ··· 196 197 cwd: Path | None = None, 197 198 env: dict[str, str] | None = None, 198 199 timeout: int = 600, 199 - first_event_timeout: int = 30, 200 + first_event_timeout: int = 90, 200 201 ) -> None: 201 202 self.cmd = cmd 202 203 self.prompt_text = prompt_text ··· 208 209 self.timeout = timeout 209 210 self.first_event_timeout = first_event_timeout 210 211 self._timed_out_waiting_for_first_event = False 212 + self._already_retried_first_event: bool = False 211 213 self.cli_session_id: str | None = None 212 214 213 215 async def run(self) -> str: ··· 224 226 raise RuntimeError( 225 227 f"CLI tool '{binary}' not found. Install it and ensure it's on PATH." 226 228 ) 227 - 228 - import os 229 229 230 230 proc_env = self.env if self.env is not None else os.environ.copy() 231 231 ··· 262 262 self._timed_out_waiting_for_first_event = False 263 263 264 264 try: 265 - await asyncio.wait_for( 266 - self._process_stdout(process), 267 - timeout=self.timeout, 268 - ) 265 + try: 266 + await asyncio.wait_for( 267 + self._process_stdout(process), 268 + timeout=self.timeout, 269 + ) 270 + except asyncio.TimeoutError: 271 + if ( 272 + self._timed_out_waiting_for_first_event 273 + and not self._already_retried_first_event 274 + ): 275 + LOG.warning( 276 + "CLI first-event timed out after %ss, retrying once", 277 + self.first_event_timeout, 278 + ) 279 + self._already_retried_first_event = True 280 + process.kill() 281 + await stderr_task 282 + self._write_timeout_log( 283 + which_timeout="first_event", 284 + timeout_seconds=self.first_event_timeout, 285 + proc_env=proc_env, 286 + cmd=self.cmd, 287 + cwd=str(self.cwd), 288 + stderr_lines=stderr_lines, 289 + ) 290 + 291 + process = await asyncio.create_subprocess_exec( 292 + *self.cmd, 293 + stdin=asyncio.subprocess.PIPE, 294 + stdout=asyncio.subprocess.PIPE, 295 + stderr=asyncio.subprocess.PIPE, 296 + limit=1024 * 1024, 297 + cwd=str(self.cwd), 298 + env=proc_env, 299 + ) 300 + 301 + if process.stdin: 302 + process.stdin.write(self.prompt_text.encode("utf-8")) 303 + process.stdin.close() 304 + 305 + stderr_lines = [] 306 + stderr_task = asyncio.create_task(_read_stderr()) 307 + self._timed_out_waiting_for_first_event = False 308 + await asyncio.wait_for( 309 + self._process_stdout(process), 310 + timeout=self.timeout, 311 + ) 312 + else: 313 + raise 269 314 except asyncio.TimeoutError: 270 315 timeout_seconds = ( 271 316 self.first_event_timeout 272 317 if self._timed_out_waiting_for_first_event 273 318 else self.timeout 319 + ) 320 + which_timeout = ( 321 + "first_event" if self._timed_out_waiting_for_first_event else "full_run" 274 322 ) 275 323 LOG.error("CLI process timed out after %ss, killing", timeout_seconds) 276 324 process.kill() 277 325 await stderr_task 326 + self._write_timeout_log( 327 + which_timeout=which_timeout, 328 + timeout_seconds=timeout_seconds, 329 + proc_env=proc_env, 330 + cmd=self.cmd, 331 + cwd=str(self.cwd), 332 + stderr_lines=stderr_lines, 333 + ) 278 334 stderr_tail = "\n".join(stderr_lines[-20:]) 279 335 error_message = ( 280 336 f"CLI process timed out after {timeout_seconds}s. " ··· 387 443 if not raw_line: 388 444 break 389 445 _process_line(raw_line) 446 + 447 + def _write_timeout_log( 448 + self, 449 + *, 450 + which_timeout: str, 451 + timeout_seconds: int, 452 + proc_env: dict[str, str], 453 + cmd: list[str], 454 + cwd: str | None, 455 + stderr_lines: list[str], 456 + ) -> Path | None: 457 + """Write a postmortem log for a CLI timeout.""" 458 + 459 + timestamp_ms = now_ms() 460 + path = _TIMEOUT_LOG_DIR / f"gemini-cogitate-timeout-{timestamp_ms}.log" 461 + env_keys = ", ".join(sorted(set(proc_env.keys()))) 462 + stderr_text = "\n".join(stderr_lines) 463 + content = "\n".join( 464 + [ 465 + f"timestamp_ms: {timestamp_ms}", 466 + f"which_timeout: {which_timeout}", 467 + f"timeout_seconds: {timeout_seconds}", 468 + (f"already_retried_first_event: {self._already_retried_first_event}"), 469 + f"cmd: {cmd!r}", 470 + f"cwd: {cwd if cwd is not None else 'None'}", 471 + f"env_keys: {env_keys}", 472 + "stderr (full):", 473 + stderr_text, 474 + ] 475 + ) 476 + 477 + try: 478 + with open(path, "w", encoding="utf-8") as log_file: 479 + log_file.write(content) 480 + os.chmod(str(path), 0o600) 481 + except OSError as exc: 482 + LOG.warning("Could not write timeout log to %s: %s", path, exc) 483 + return None 484 + return path 390 485 391 486 392 487 # ---------------------------------------------------------------------------