personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: recover from LimitOverrunError in CLI stdout processing

When a JSONL line from a CLI subprocess exceeds the asyncio StreamReader
1 MB buffer limit, _process_stdout crashed with LimitOverrunError. The
async for loop is replaced with a while True + explicit readline() loop
that catches LimitOverrunError, drains the remainder of the oversized
line, emits a synthetic tool_end event so the agent can retry with a
narrower query, and continues processing. Normal-sized lines are
unaffected.

+138 -1
+107
tests/test_cli_provider.py
··· 366 366 assert "Please authenticate first" in error_events[0]["error"] 367 367 368 368 369 + _OVERSIZE = object() # sentinel for oversize line in _MockStdoutWithOversize 370 + 371 + 372 + class _MockStdoutWithOversize: 373 + """Stdout mock that raises LimitOverrunError on a specific readline() call.""" 374 + 375 + def __init__(self, lines: list): 376 + # lines entries are either bytes or the sentinel OVERSIZE 377 + self._lines = lines 378 + self._index = 0 379 + self._draining_oversize = False 380 + 381 + async def readline(self): 382 + if self._draining_oversize: 383 + self._draining_oversize = False 384 + return b"x" * 1024 * 1024 + b"\n" 385 + if self._index >= len(self._lines): 386 + return b"" 387 + entry = self._lines[self._index] 388 + self._index += 1 389 + if entry is _OVERSIZE: 390 + self._draining_oversize = True 391 + raise asyncio.LimitOverrunError( 392 + "Separator is not found, and chunk exceed the limit", 1024 * 1024 393 + ) 394 + return entry 395 + 396 + async def readexactly(self, n: int) -> bytes: 397 + return b"x" * n 398 + 399 + def __aiter__(self): 400 + return self 401 + 402 + async def __anext__(self): 403 + val = await self.readline() 404 + if val == b"": 405 + raise StopAsyncIteration 406 + return val 407 + 408 + 409 + class TestCLIRunnerOversizedOutput: 410 + """CLIRunner recovers from LimitOverrunError in the stdout loop.""" 411 + 412 + def test_oversized_line_emits_tool_end_and_continues(self): 413 + """Oversize line → synthetic tool_end emitted + subsequent line processed.""" 414 + import json 415 + 416 + normal_line_1 = json.dumps({"event": "text", "text": "hello"}).encode() + b"\n" 417 + normal_line_2 = json.dumps({"event": "text", "text": "world"}).encode() + b"\n" 418 + 419 + events = [] 420 + callback = JSONEventCallback(events.append) 421 + aggregator = ThinkingAggregator(callback, model="test-model") 422 + 423 + process = AsyncMock() 424 + process.stdout = _MockStdoutWithOversize( 425 + [ 426 + normal_line_1, 427 + _OVERSIZE, 428 + normal_line_2, 429 + ] 430 + ) 431 + process.stderr = _MockStderr([]) 432 + process.stdin = AsyncMock() 433 + process.stdin.write = lambda _data: None 434 + process.stdin.close = lambda: None 435 + process.kill = lambda: None 436 + process.wait = AsyncMock(return_value=0) 437 + 438 + # translate just forwards text events as-is 439 + def translate(event_data, agg, cb): 440 + if event_data.get("event") == "text": 441 + cb.emit({"event": "text", "text": event_data["text"]}) 442 + return None 443 + 444 + runner = CLIRunner( 445 + cmd=["fakecli", "--json"], 446 + prompt_text="test", 447 + translate=translate, 448 + callback=callback, 449 + aggregator=aggregator, 450 + ) 451 + 452 + with ( 453 + patch( 454 + "think.providers.cli.asyncio.create_subprocess_exec", 455 + AsyncMock(return_value=process), 456 + ), 457 + patch("think.providers.cli.shutil.which", return_value="/usr/bin/fakecli"), 458 + ): 459 + asyncio.run(runner.run()) 460 + 461 + event_types = [e["event"] for e in events] 462 + # tool_end should be emitted 463 + assert "tool_end" in event_types, f"Expected tool_end in events: {events}" 464 + 465 + # the tool_end result should indicate truncation 466 + tool_end_events = [e for e in events if e["event"] == "tool_end"] 467 + assert len(tool_end_events) == 1 468 + assert "truncated" in tool_end_events[0]["result"] 469 + 470 + # the normal line after the oversize error should also be processed 471 + text_events = [e for e in events if e["event"] == "text"] 472 + texts = [e["text"] for e in text_events] 473 + assert "world" in texts, f"Expected 'world' in text events: {texts}" 474 + 475 + 369 476 # --------------------------------------------------------------------------- 370 477 # safe_raw 371 478 # ---------------------------------------------------------------------------
+31 -1
think/providers/cli.py
··· 29 29 _PROJECT_ROOT = Path(__file__).parent.parent.parent 30 30 31 31 32 + async def _drain_line(stream: asyncio.StreamReader) -> None: 33 + """Drain a single overlong line from the stream by consuming it in chunks.""" 34 + while True: 35 + try: 36 + await stream.readline() 37 + return 38 + except asyncio.LimitOverrunError as exc: 39 + await stream.readexactly(exc.consumed) 40 + 41 + 32 42 # --------------------------------------------------------------------------- 33 43 # Prompt Assembly 34 44 # --------------------------------------------------------------------------- ··· 337 347 return 338 348 _process_line(first_line) 339 349 340 - async for raw_line in process.stdout: 350 + while True: 351 + try: 352 + raw_line = await process.stdout.readline() 353 + except asyncio.LimitOverrunError as exc: 354 + LOG.warning( 355 + "CLI stdout line exceeded buffer limit (%d bytes consumed before limit); " 356 + "draining and emitting truncated tool_end", 357 + exc.consumed, 358 + ) 359 + await _drain_line(process.stdout) 360 + self.callback.emit( 361 + { 362 + "event": "tool_end", 363 + "tool": "bash", 364 + "result": "[output truncated: too large to process — try a more targeted query]", 365 + "ts": now_ms(), 366 + } 367 + ) 368 + continue 369 + if not raw_line: 370 + break 341 371 _process_line(raw_line) 342 372 343 373