personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

providers/anthropic: enforce max_tokens > thinking_budget and auto-stream over threshold

Make run_generate and run_agenerate robust to two latent Anthropic SDK
constraints that surfaced during the L3 sense pilot. Both apply only to
the generate path; cogitate is untouched.

Fix A: when thinking_budget is positive and max_output_tokens falls at
or below thinking_budget + 1000, lift max_tokens rather than clamp the
caller's thinking budget. The caller's declared max is a stated output
floor when thinking is active; clamping thinking would silently shrink
a deliberate reasoning budget. A logger.info emits before/after on each
lift. The BadRequestError retry inherits the adjustment via dict copy.

Fix B: route requests through client.messages.stream(...) when
max_tokens trips the SDK's non-streaming guard, either by exceeding
MODEL_NONSTREAMING_TOKENS[model] or the time formula threshold
(60 * 60 * max_tokens / 128_000 > 600 ≈ 21,333 tokens). Downstream
extraction is unchanged since ParsedMessage subclasses Message.
MODEL_NONSTREAMING_TOKENS is imported from anthropic._constants — the
SDK itself imports it via the public messages path at
anthropic/resources/messages/messages.py, so the symbol is stable.

Both fixes compose: thinking_budget=24576 + max_output_tokens=24576 is
lifted to 25577 and then routed through streaming. The retry path
re-evaluates the dispatch decision, so a primary create() call that
raises BadRequestError with post-lift max_tokens above the threshold
will route its retry through streaming.

Live validation with production-scale budgets deferred:
ANTHROPIC_API_KEY is not available in this worktree. Unit tests cover
Fix A (adjust / no-adjust / async), Fix B (create vs stream per model,
sync + async), the Fix A → Fix B interaction, and the tool-use fallback
routing under streaming.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+340 -6
+275
tests/test_anthropic.py
··· 81 81 ): 82 82 # Create mock Anthropic client 83 83 anthropic_stub = types.ModuleType("anthropic") 84 + anthropic_constants_stub = types.ModuleType("anthropic._constants") 84 85 anthropic_types_stub = types.ModuleType("anthropic.types") 85 86 86 87 class DummyClient: ··· 100 101 anthropic_stub.Anthropic = DummyClient 101 102 anthropic_stub.AsyncAnthropic = DummyClient # Add async version 102 103 anthropic_stub.BadRequestError = DummyBadRequestError 104 + anthropic_constants_stub.MODEL_NONSTREAMING_TOKENS = { 105 + "claude-opus-4-20250514": 8192, 106 + "claude-opus-4-0": 8192, 107 + "claude-4-opus-20250514": 8192, 108 + "anthropic.claude-opus-4-20250514-v1:0": 8192, 109 + "claude-opus-4@20250514": 8192, 110 + "claude-opus-4-1-20250805": 8192, 111 + "anthropic.claude-opus-4-1-20250805-v1:0": 8192, 112 + "claude-opus-4-1@20250805": 8192, 113 + } 103 114 104 115 # Add types to the types module 116 + anthropic_types_stub.Message = SimpleNamespace 105 117 anthropic_types_stub.MessageParam = dict 106 118 anthropic_types_stub.ToolParam = dict 107 119 anthropic_types_stub.ToolUseBlock = SimpleNamespace ··· 115 127 # Stub out the anthropic module 116 128 if "anthropic" in sys.modules: 117 129 sys.modules.pop("anthropic") 130 + if "anthropic._constants" in sys.modules: 131 + sys.modules.pop("anthropic._constants") 118 132 if "anthropic.types" in sys.modules: 119 133 sys.modules.pop("anthropic.types") 120 134 sys.modules["anthropic"] = anthropic_stub 135 + sys.modules["anthropic._constants"] = anthropic_constants_stub 121 136 sys.modules["anthropic.types"] = anthropic_types_stub 122 137 123 138 ··· 565 580 "format": {"type": "json_schema", "schema": schema} 566 581 } 567 582 assert call_kwargs["system"] == "base" 583 + 584 + 585 + def _make_response(content=None, stop_reason="end_turn"): 586 + response = MagicMock() 587 + response.content = ( 588 + content if content is not None else [SimpleNamespace(type="text", text="ok")] 589 + ) 590 + response.usage = None 591 + response.stop_reason = stop_reason 592 + return response 593 + 594 + 595 + def _make_stream_cm(final_message, *, enter_side_effect=None): 596 + cm = MagicMock() 597 + if enter_side_effect is None: 598 + stream = MagicMock() 599 + stream.get_final_message = MagicMock(return_value=final_message) 600 + cm.__enter__ = MagicMock(return_value=stream) 601 + else: 602 + cm.__enter__ = MagicMock(side_effect=enter_side_effect) 603 + cm.__exit__ = MagicMock(return_value=False) 604 + return cm 605 + 606 + 607 + def _make_async_stream_cm(final_message): 608 + cm = MagicMock() 609 + stream = MagicMock() 610 + stream.get_final_message = AsyncMock(return_value=final_message) 611 + cm.__aenter__ = AsyncMock(return_value=stream) 612 + cm.__aexit__ = AsyncMock(return_value=False) 613 + return cm 614 + 615 + 616 + class TestBudgetAdjustment: 617 + def test_lifts_max_tokens_when_collides_with_thinking_budget(self, monkeypatch): 618 + provider = importlib.reload( 619 + importlib.import_module("think.providers.anthropic") 620 + ) 621 + mock_client = MagicMock() 622 + mock_client.messages.create.return_value = _make_response() 623 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 624 + 625 + provider.run_generate( 626 + "hello", 627 + thinking_budget=4096, 628 + max_output_tokens=4096, 629 + ) 630 + 631 + call_kwargs = mock_client.messages.create.call_args.kwargs 632 + assert call_kwargs["max_tokens"] == 4096 + 1000 + 1 633 + assert call_kwargs["thinking"]["budget_tokens"] == 4096 634 + 635 + def test_leaves_max_tokens_when_buffer_satisfied(self, monkeypatch): 636 + provider = importlib.reload( 637 + importlib.import_module("think.providers.anthropic") 638 + ) 639 + mock_client = MagicMock() 640 + mock_client.messages.create.return_value = _make_response() 641 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 642 + 643 + provider.run_generate( 644 + "hello", 645 + thinking_budget=4096, 646 + max_output_tokens=8192, 647 + ) 648 + 649 + call_kwargs = mock_client.messages.create.call_args.kwargs 650 + assert call_kwargs["max_tokens"] == 8192 651 + 652 + def test_no_adjustment_without_thinking(self, monkeypatch): 653 + provider = importlib.reload( 654 + importlib.import_module("think.providers.anthropic") 655 + ) 656 + mock_client = MagicMock() 657 + mock_client.messages.create.return_value = _make_response() 658 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 659 + 660 + provider.run_generate( 661 + "hello", 662 + thinking_budget=0, 663 + max_output_tokens=4096, 664 + ) 665 + 666 + call_kwargs = mock_client.messages.create.call_args.kwargs 667 + assert call_kwargs["max_tokens"] == 4096 668 + assert "thinking" not in call_kwargs 669 + 670 + def test_async_lifts_max_tokens_when_collides(self, monkeypatch): 671 + provider = importlib.reload( 672 + importlib.import_module("think.providers.anthropic") 673 + ) 674 + mock_client = MagicMock() 675 + mock_client.messages.create = AsyncMock(return_value=_make_response()) 676 + monkeypatch.setattr( 677 + provider, "_get_async_anthropic_client", lambda: mock_client 678 + ) 679 + 680 + asyncio.run( 681 + provider.run_agenerate( 682 + "hello", 683 + thinking_budget=4096, 684 + max_output_tokens=4096, 685 + ) 686 + ) 687 + 688 + call_kwargs = mock_client.messages.create.call_args.kwargs 689 + assert call_kwargs["max_tokens"] == 4096 + 1000 + 1 690 + assert call_kwargs["thinking"]["budget_tokens"] == 4096 691 + 692 + 693 + class TestStreamingDispatch: 694 + def test_streams_when_max_tokens_exceeds_time_formula(self, monkeypatch): 695 + provider = importlib.reload( 696 + importlib.import_module("think.providers.anthropic") 697 + ) 698 + mock_client = MagicMock() 699 + mock_client.messages.stream.return_value = _make_stream_cm(_make_response()) 700 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 701 + 702 + provider.run_generate( 703 + "hello", 704 + model="claude-sonnet-4-5", 705 + max_output_tokens=49152, 706 + ) 707 + 708 + assert mock_client.messages.stream.call_count == 1 709 + assert mock_client.messages.create.call_count == 0 710 + 711 + def test_uses_create_below_threshold(self, monkeypatch): 712 + provider = importlib.reload( 713 + importlib.import_module("think.providers.anthropic") 714 + ) 715 + mock_client = MagicMock() 716 + mock_client.messages.create.return_value = _make_response() 717 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 718 + 719 + provider.run_generate( 720 + "hello", 721 + model="claude-sonnet-4-5", 722 + max_output_tokens=8192, 723 + ) 724 + 725 + assert mock_client.messages.create.call_count == 1 726 + assert mock_client.messages.stream.call_count == 0 727 + 728 + def test_streams_when_per_model_cap_exceeded(self, monkeypatch): 729 + provider = importlib.reload( 730 + importlib.import_module("think.providers.anthropic") 731 + ) 732 + mock_client = MagicMock() 733 + mock_client.messages.stream.return_value = _make_stream_cm(_make_response()) 734 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 735 + 736 + provider.run_generate( 737 + "hello", 738 + model="claude-opus-4-1-20250805", 739 + max_output_tokens=16384, 740 + ) 741 + 742 + assert mock_client.messages.stream.call_count == 1 743 + assert mock_client.messages.create.call_count == 0 744 + 745 + def test_async_streams_when_max_tokens_exceeds_time_formula(self, monkeypatch): 746 + provider = importlib.reload( 747 + importlib.import_module("think.providers.anthropic") 748 + ) 749 + mock_client = MagicMock() 750 + mock_client.messages.stream.return_value = _make_async_stream_cm( 751 + _make_response() 752 + ) 753 + monkeypatch.setattr( 754 + provider, "_get_async_anthropic_client", lambda: mock_client 755 + ) 756 + 757 + asyncio.run( 758 + provider.run_agenerate( 759 + "hello", 760 + model="claude-sonnet-4-5", 761 + max_output_tokens=49152, 762 + ) 763 + ) 764 + 765 + assert mock_client.messages.stream.call_count == 1 766 + assert mock_client.messages.create.call_count == 0 767 + 768 + def test_async_uses_create_below_threshold(self, monkeypatch): 769 + provider = importlib.reload( 770 + importlib.import_module("think.providers.anthropic") 771 + ) 772 + mock_client = MagicMock() 773 + mock_client.messages.create = AsyncMock(return_value=_make_response()) 774 + monkeypatch.setattr( 775 + provider, "_get_async_anthropic_client", lambda: mock_client 776 + ) 777 + 778 + asyncio.run( 779 + provider.run_agenerate( 780 + "hello", 781 + model="claude-sonnet-4-5", 782 + max_output_tokens=8192, 783 + ) 784 + ) 785 + 786 + assert mock_client.messages.create.call_count == 1 787 + assert mock_client.messages.stream.call_count == 0 788 + 789 + def test_streams_tool_use_fallback_extracts_json(self, monkeypatch): 790 + provider = importlib.reload( 791 + importlib.import_module("think.providers.anthropic") 792 + ) 793 + mock_client = MagicMock() 794 + 795 + class DummyBadRequestError(Exception): 796 + pass 797 + 798 + fallback_response = _make_response( 799 + content=[SimpleNamespace(type="tool_use", input={"key": "value"})] 800 + ) 801 + # Raising from __enter__ is a sufficient stand-in for the SDK surfacing 802 + # the BadRequestError before a streaming response is available. 803 + mock_client.messages.stream.side_effect = [ 804 + _make_stream_cm( 805 + None, 806 + enter_side_effect=DummyBadRequestError("bad schema"), 807 + ), 808 + _make_stream_cm(fallback_response), 809 + ] 810 + 811 + monkeypatch.setattr(provider, "BadRequestError", DummyBadRequestError) 812 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 813 + 814 + result = provider.run_generate( 815 + "hello", 816 + model="claude-sonnet-4-5", 817 + json_schema={"type": "object"}, 818 + max_output_tokens=49152, 819 + ) 820 + 821 + assert mock_client.messages.stream.call_count == 2 822 + assert mock_client.messages.create.call_count == 0 823 + assert json.loads(result["text"]) == {"key": "value"} 824 + 825 + def test_interaction_thinking_and_streaming(self, monkeypatch): 826 + provider = importlib.reload( 827 + importlib.import_module("think.providers.anthropic") 828 + ) 829 + mock_client = MagicMock() 830 + mock_client.messages.stream.return_value = _make_stream_cm(_make_response()) 831 + monkeypatch.setattr(provider, "_get_anthropic_client", lambda: mock_client) 832 + 833 + provider.run_generate( 834 + "hello", 835 + model="claude-sonnet-4-5", 836 + thinking_budget=24576, 837 + max_output_tokens=24576, 838 + ) 839 + 840 + assert mock_client.messages.stream.call_count == 1 841 + call_kwargs = mock_client.messages.stream.call_args.kwargs 842 + assert call_kwargs["max_tokens"] == 25577
+65 -6
think/providers/anthropic.py
··· 40 40 from typing import Any, Callable 41 41 42 42 from anthropic import AsyncAnthropic, BadRequestError 43 + from anthropic._constants import MODEL_NONSTREAMING_TOKENS 43 44 from anthropic.types import ( 45 + Message, 44 46 MessageParam, 45 47 RedactedThinkingBlock, 46 48 ThinkingBlock, ··· 71 73 _DEFAULT_MAX_TOKENS = 8096 * 2 72 74 _MIN_THINKING_BUDGET = 1024 # Anthropic minimum 73 75 _DEFAULT_THINKING_BUDGET = 10000 76 + _MAX_TOKENS_BUFFER = ( 77 + 1000 # Anthropic rejects requests where max_tokens <= thinking.budget_tokens. 78 + ) 79 + _NONSTREAMING_TIME_CAP_TOKENS = ( 80 + 21_333 # SDK time formula: 60*60*max_tokens/128_000 > 600 ≈ 21,333. 81 + ) 82 + # anthropic._constants.MODEL_NONSTREAMING_TOKENS adds per-model non-streaming caps on top of this threshold. 74 83 75 84 76 85 def _compute_thinking_params(max_tokens: int) -> tuple[int, int]: ··· 413 422 raise ValueError("Anthropic schema fallback response missing tool_use block") 414 423 415 424 425 + def _adjust_budget_for_thinking(request_kwargs: dict[str, Any]) -> None: 426 + """Lift max_tokens when thinking budget would otherwise collide with Anthropic validation.""" 427 + thinking = request_kwargs.get("thinking") 428 + if not thinking: 429 + return 430 + 431 + budget_tokens = thinking.get("budget_tokens") 432 + if not budget_tokens or budget_tokens <= 0: 433 + return 434 + 435 + max_tokens = request_kwargs["max_tokens"] 436 + minimum_max_tokens = budget_tokens + _MAX_TOKENS_BUFFER + 1 437 + if max_tokens <= budget_tokens + _MAX_TOKENS_BUFFER: 438 + logger.info( 439 + "Adjusted Anthropic max_tokens for thinking budget: %s -> %s", 440 + max_tokens, 441 + minimum_max_tokens, 442 + ) 443 + # Anthropic requires max_tokens > thinking.budget_tokens; lift rather than clamp thinking so the caller's stated output floor is preserved. 444 + request_kwargs["max_tokens"] = minimum_max_tokens 445 + 446 + 447 + def _requires_streaming(model: str, max_tokens: int) -> bool: 448 + """Return whether the Anthropic SDK would require streaming for this request.""" 449 + if max_tokens > _NONSTREAMING_TIME_CAP_TOKENS: 450 + return True 451 + cap = MODEL_NONSTREAMING_TOKENS.get(model) 452 + if cap is not None and max_tokens > cap: 453 + return True 454 + return False 455 + 456 + 457 + def _send_message(client: Any, request_kwargs: dict[str, Any]) -> Message: 458 + """Dispatch sync message requests via create or stream based on Anthropic limits.""" 459 + if _requires_streaming(request_kwargs["model"], request_kwargs["max_tokens"]): 460 + with client.messages.stream(**request_kwargs) as stream: 461 + return stream.get_final_message() 462 + return client.messages.create(**request_kwargs) 463 + 464 + 465 + async def _asend_message(client: Any, request_kwargs: dict[str, Any]) -> Message: 466 + """Dispatch async message requests via create or stream based on Anthropic limits.""" 467 + if _requires_streaming(request_kwargs["model"], request_kwargs["max_tokens"]): 468 + async with client.messages.stream(**request_kwargs) as stream: 469 + return await stream.get_final_message() 470 + return await client.messages.create(**request_kwargs) 471 + 472 + 416 473 # Cache for Anthropic clients 417 474 _anthropic_client = None 418 475 _async_anthropic_client = None ··· 504 561 } 505 562 else: 506 563 request_kwargs["temperature"] = temperature 564 + _adjust_budget_for_thinking(request_kwargs) 507 565 508 566 if timeout_s: 509 567 request_kwargs["timeout"] = timeout_s ··· 514 572 "format": {"type": "json_schema", "schema": json_schema} 515 573 } 516 574 try: 517 - response = client.messages.create(**request_kwargs) 575 + response = _send_message(client, request_kwargs) 518 576 text, thinking = _extract_text_and_thinking(response) 519 577 except BadRequestError: 520 578 retry_kwargs = dict(request_kwargs) ··· 532 590 } 533 591 ] 534 592 retry_kwargs["tool_choice"] = {"type": "tool", "name": tool_name} 535 - response = client.messages.create(**retry_kwargs) 593 + response = _send_message(client, retry_kwargs) 536 594 text = _extract_first_tool_use_json(response) 537 595 _, thinking = _extract_text_and_thinking(response) 538 596 else: 539 - response = client.messages.create(**request_kwargs) 597 + response = _send_message(client, request_kwargs) 540 598 text, thinking = _extract_text_and_thinking(response) 541 599 542 600 return GenerateResult( ··· 592 650 } 593 651 else: 594 652 request_kwargs["temperature"] = temperature 653 + _adjust_budget_for_thinking(request_kwargs) 595 654 596 655 if timeout_s: 597 656 request_kwargs["timeout"] = timeout_s ··· 602 661 "format": {"type": "json_schema", "schema": json_schema} 603 662 } 604 663 try: 605 - response = await client.messages.create(**request_kwargs) 664 + response = await _asend_message(client, request_kwargs) 606 665 text, thinking = _extract_text_and_thinking(response) 607 666 except BadRequestError: 608 667 retry_kwargs = dict(request_kwargs) ··· 620 679 } 621 680 ] 622 681 retry_kwargs["tool_choice"] = {"type": "tool", "name": tool_name} 623 - response = await client.messages.create(**retry_kwargs) 682 + response = await _asend_message(client, retry_kwargs) 624 683 text = _extract_first_tool_use_json(response) 625 684 _, thinking = _extract_text_and_thinking(response) 626 685 else: 627 - response = await client.messages.create(**request_kwargs) 686 + response = await _asend_message(client, request_kwargs) 628 687 text, thinking = _extract_text_and_thinking(response) 629 688 630 689 return GenerateResult(