personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

models: route talent health by provider row

Read talent health from health/talents.json, gate fallback by provider/model/interface rows, and record quota exhaustion idempotently with reset timing and recomputed summaries. This gives the quota fallback path a durable health signal instead of relying on provider-wide failure state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+417 -14
+103
tests/test_models_health.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + 6 + import pytest 7 + 8 + from think.models import record_provider_failure 9 + 10 + 11 + def _read_health(tmp_path): 12 + return json.loads((tmp_path / "health" / "talents.json").read_text()) 13 + 14 + 15 + def test_record_provider_failure_appends_new_row(monkeypatch, tmp_path): 16 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 17 + 18 + record_provider_failure( 19 + "google", 20 + "flash", 21 + "gemini-3-flash-preview", 22 + "cogitate", 23 + 12345, 24 + ) 25 + 26 + payload = _read_health(tmp_path) 27 + assert payload["summary"] == {"total": 1, "passed": 0, "skipped": 0, "failed": 1} 28 + row = payload["results"][0] 29 + assert row["provider"] == "google" 30 + assert row["tier"] == "flash" 31 + assert row["model"] == "gemini-3-flash-preview" 32 + assert row["interface"] == "cogitate" 33 + assert row["ok"] is False 34 + assert row["status"] == "quota_exhausted" 35 + assert row["message"] == "Quota exhausted; retry after 12345" 36 + assert row["elapsed_s"] == 0.0 37 + assert row["reset_at_ms"] == 12345 38 + assert isinstance(row["recorded_at"], str) 39 + 40 + 41 + def test_record_provider_failure_updates_duplicate_key(monkeypatch, tmp_path): 42 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 43 + 44 + record_provider_failure("google", "flash", "gemini", "cogitate", 100) 45 + record_provider_failure("google", "flash", "gemini", "cogitate", 200) 46 + 47 + payload = _read_health(tmp_path) 48 + assert len(payload["results"]) == 1 49 + row = payload["results"][0] 50 + assert row["reset_at_ms"] == 200 51 + assert row["message"] == "Quota exhausted; retry after 200" 52 + assert row["ok"] is False 53 + assert row["status"] == "quota_exhausted" 54 + assert row["elapsed_s"] == 0.0 55 + 56 + 57 + def test_record_provider_failure_recomputes_summary(monkeypatch, tmp_path): 58 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 59 + health_dir = tmp_path / "health" 60 + health_dir.mkdir() 61 + (health_dir / "talents.json").write_text( 62 + json.dumps( 63 + { 64 + "results": [ 65 + {"provider": "openai", "status": "ok", "ok": True}, 66 + {"provider": "anthropic", "status": "skip", "ok": True}, 67 + ], 68 + "summary": {"total": 999, "passed": 999, "skipped": 0, "failed": 0}, 69 + "checked_at": "2026-01-01T00:00:00+00:00", 70 + } 71 + ) 72 + ) 73 + 74 + record_provider_failure("google", "flash", "gemini", "cogitate", 300) 75 + 76 + payload = _read_health(tmp_path) 77 + assert payload["summary"] == {"total": 3, "passed": 1, "skipped": 1, "failed": 1} 78 + assert payload["checked_at"] == "2026-01-01T00:00:00+00:00" 79 + 80 + 81 + def test_record_provider_failure_atomic_replace_failure_preserves_file( 82 + monkeypatch, tmp_path 83 + ): 84 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 85 + health_dir = tmp_path / "health" 86 + health_dir.mkdir() 87 + health_path = health_dir / "talents.json" 88 + original = { 89 + "results": [{"provider": "openai", "status": "ok", "ok": True}], 90 + "summary": {"total": 1, "passed": 1, "skipped": 0, "failed": 0}, 91 + "checked_at": "2026-01-01T00:00:00+00:00", 92 + } 93 + health_path.write_text(json.dumps(original), encoding="utf-8") 94 + 95 + def fail_replace(_src, _dst): 96 + raise OSError("replace failed") 97 + 98 + monkeypatch.setattr("think.models.os.replace", fail_replace) 99 + 100 + with pytest.raises(OSError, match="replace failed"): 101 + record_provider_failure("google", "flash", "gemini", "cogitate", 400) 102 + 103 + assert json.loads(health_path.read_text()) == original
+168 -3
tests/test_talent_fallback.py
··· 14 14 TYPE_DEFAULTS, 15 15 get_backup_provider, 16 16 is_provider_healthy, 17 + is_provider_model_interface_healthy, 17 18 should_recheck_health, 18 19 ) 20 + from think.providers.cli import QuotaExhaustedError 19 21 from think.talents import _is_retryable_error 22 + from think.utils import now_ms 20 23 21 24 22 25 def test_is_provider_healthy_all_failed(): ··· 48 51 assert is_provider_healthy("google", health_data) is True 49 52 50 53 54 + def test_is_provider_model_interface_healthy_match_failed(): 55 + health_data = { 56 + "results": [ 57 + { 58 + "provider": "google", 59 + "model": "gemini-3-flash-preview", 60 + "interface": "cogitate", 61 + "ok": False, 62 + } 63 + ] 64 + } 65 + assert ( 66 + is_provider_model_interface_healthy( 67 + "google", "gemini-3-flash-preview", "cogitate", health_data 68 + ) 69 + is False 70 + ) 71 + 72 + 73 + def test_is_provider_model_interface_healthy_mismatch_is_healthy(): 74 + health_data = { 75 + "results": [ 76 + { 77 + "provider": "google", 78 + "model": "gemini-3-flash-preview", 79 + "interface": "generate", 80 + "ok": False, 81 + } 82 + ] 83 + } 84 + assert ( 85 + is_provider_model_interface_healthy( 86 + "google", "gemini-3-flash-preview", "cogitate", health_data 87 + ) 88 + is True 89 + ) 90 + 91 + 92 + def test_is_provider_model_interface_healthy_missing_fields_are_healthy(): 93 + health_data = {"results": [{"provider": "google", "ok": False}]} 94 + assert ( 95 + is_provider_model_interface_healthy( 96 + "google", "gemini-3-flash-preview", "cogitate", health_data 97 + ) 98 + is True 99 + ) 100 + 101 + 102 + def test_is_provider_model_interface_healthy_none_data(): 103 + assert ( 104 + is_provider_model_interface_healthy( 105 + "google", "gemini-3-flash-preview", "cogitate", None 106 + ) 107 + is True 108 + ) 109 + 110 + 51 111 def test_should_recheck_health_stale(): 52 112 checked_at = (datetime.now(timezone.utc) - timedelta(hours=2)).isoformat() 53 113 health_data = {"checked_at": checked_at} ··· 58 118 checked_at = (datetime.now(timezone.utc) - timedelta(minutes=10)).isoformat() 59 119 health_data = {"checked_at": checked_at} 60 120 assert should_recheck_health(health_data) is False 121 + 122 + 123 + def test_should_recheck_health_honors_reset_at_ms(): 124 + checked_at = (datetime.now(timezone.utc) - timedelta(hours=2)).isoformat() 125 + pre_reset = { 126 + "checked_at": checked_at, 127 + "results": [{"ok": False, "reset_at_ms": now_ms() + 60_000}], 128 + } 129 + post_reset = { 130 + "checked_at": checked_at, 131 + "results": [{"ok": False, "reset_at_ms": now_ms() - 1_000}], 132 + } 133 + no_reset = { 134 + "checked_at": checked_at, 135 + "results": [{"ok": False}], 136 + } 137 + 138 + assert should_recheck_health(pre_reset) is False 139 + assert should_recheck_health(post_reset) is True 140 + assert should_recheck_health(no_reset) is True 61 141 62 142 63 143 def test_get_backup_provider_from_config(monkeypatch): ··· 117 197 _patch_prepare_config_dependencies(monkeypatch) 118 198 monkeypatch.setattr( 119 199 "think.models.load_health_status", 120 - lambda: {"results": [{"provider": "google", "ok": False}]}, 200 + lambda: { 201 + "results": [ 202 + { 203 + "provider": "google", 204 + "model": "gemini-3-flash-preview", 205 + "interface": "cogitate", 206 + "ok": False, 207 + } 208 + ] 209 + }, 121 210 ) 122 211 monkeypatch.setattr("think.models.should_recheck_health", lambda _h: False) 123 212 monkeypatch.setattr("think.models.get_backup_provider", lambda _type: "anthropic") ··· 140 229 _patch_prepare_config_dependencies(monkeypatch) 141 230 monkeypatch.setattr( 142 231 "think.models.load_health_status", 143 - lambda: {"results": [{"provider": "google", "ok": True}]}, 232 + lambda: { 233 + "results": [ 234 + { 235 + "provider": "google", 236 + "model": "gemini-3-flash-preview", 237 + "interface": "cogitate", 238 + "ok": True, 239 + } 240 + ] 241 + }, 144 242 ) 145 243 monkeypatch.setattr("think.models.should_recheck_health", lambda _h: False) 146 244 ··· 156 254 _patch_prepare_config_dependencies(monkeypatch) 157 255 monkeypatch.setattr( 158 256 "think.models.load_health_status", 159 - lambda: {"results": [{"provider": "google", "ok": False}]}, 257 + lambda: { 258 + "results": [ 259 + { 260 + "provider": "google", 261 + "model": "gemini-3-flash-preview", 262 + "interface": "cogitate", 263 + "ok": False, 264 + } 265 + ] 266 + }, 160 267 ) 161 268 monkeypatch.setattr("think.models.should_recheck_health", lambda _h: False) 162 269 monkeypatch.setattr("think.models.get_backup_provider", lambda _type: "anthropic") ··· 216 323 assert config["model"] == "claude-sonnet-4-5" 217 324 assert config["fallback_from"] == "google" 218 325 assert any(e.get("event") == "fallback" for e in events) 326 + 327 + 328 + def test_quota_failure_records_health_and_falls_back(monkeypatch): 329 + from think.talents import _execute_with_tools 330 + 331 + events = [] 332 + record_mock = MagicMock() 333 + 334 + async def fail_quota(*_args, **_kwargs): 335 + raise QuotaExhaustedError("quota exhausted", retry_delay_ms=1000) 336 + 337 + async def pass_cogitate(*_args, **kwargs): 338 + on_event = kwargs.get("on_event") 339 + if on_event: 340 + on_event({"event": "finish", "result": "backup result"}) 341 + return "backup result" 342 + 343 + monkeypatch.setattr( 344 + "think.providers.PROVIDER_REGISTRY", {"google": "x", "anthropic": "y"} 345 + ) 346 + monkeypatch.setattr( 347 + "think.providers.get_provider_module", 348 + lambda provider: SimpleNamespace( 349 + run_cogitate=fail_quota if provider == "google" else pass_cogitate 350 + ), 351 + ) 352 + monkeypatch.setattr("think.models.get_backup_provider", lambda _type: "anthropic") 353 + monkeypatch.setattr( 354 + "think.models.resolve_model_for_provider", 355 + lambda _context, _provider, _type="cogitate": "claude-sonnet-4-5", 356 + ) 357 + monkeypatch.setattr("think.models.record_provider_failure", record_mock) 358 + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") 359 + 360 + config = { 361 + "type": "cogitate", 362 + "provider": "google", 363 + "tier": "flash", 364 + "model": "gemini-3-flash-preview", 365 + "health_stale": False, 366 + "context": "talent.system.default", 367 + } 368 + before_ms = now_ms() 369 + 370 + asyncio.run(_execute_with_tools(config, events.append)) 371 + 372 + quota_event = next(e for e in events if e.get("reason") == "quota_exhausted") 373 + assert quota_event["terminal"] is False 374 + assert quota_event["reset_at_ms"] >= before_ms + 1000 375 + record_mock.assert_called_once_with( 376 + "google", 377 + "flash", 378 + "gemini-3-flash-preview", 379 + "cogitate", 380 + quota_event["reset_at_ms"], 381 + ) 382 + assert config["provider"] == "anthropic" 383 + assert events[-1]["event"] == "finish" 219 384 220 385 221 386 def test_on_failure_retry_cogitate_uses_context_from_name(monkeypatch):
+115 -4
think/models.py
··· 1 1 # SPDX-License-Identifier: AGPL-3.0-only 2 2 # Copyright (c) 2026 sol pbc 3 3 4 + import fcntl 4 5 import fnmatch 5 6 import inspect 6 7 import json ··· 15 16 import frontmatter 16 17 from jsonschema import Draft202012Validator 17 18 18 - from think.utils import get_config, get_journal 19 + from think.utils import get_config, get_journal, now_ms 19 20 20 21 logger = logging.getLogger(__name__) 21 22 ··· 1117 1118 1118 1119 1119 1120 def load_health_status() -> Optional[dict]: 1120 - """Load health status from journal/health/agents.json. 1121 + """Load health status from journal/health/talents.json. 1121 1122 1122 1123 Returns parsed dict or None if file is missing/unreadable. 1123 1124 """ 1124 1125 try: 1125 - health_path = Path(get_journal()) / "health" / "agents.json" 1126 + health_path = Path(get_journal()) / "health" / "talents.json" 1126 1127 with open(health_path) as f: 1127 1128 return json.load(f) 1128 1129 except (FileNotFoundError, json.JSONDecodeError, OSError): ··· 1148 1149 return any(r.get("ok") for r in provider_results) 1149 1150 1150 1151 1152 + def is_provider_model_interface_healthy( 1153 + provider: str, 1154 + model: str, 1155 + interface: str, 1156 + health_data: Optional[dict], 1157 + ) -> bool: 1158 + """Check health for a specific provider/model/interface row.""" 1159 + if health_data is None: 1160 + return True 1161 + for row in health_data.get("results", []): 1162 + if ( 1163 + row.get("provider") == provider 1164 + and row.get("model") == model 1165 + and row.get("interface") == interface 1166 + and row.get("ok") is False 1167 + ): 1168 + return False 1169 + return True 1170 + 1171 + 1172 + def _summarize_health_results(results: list[dict[str, Any]]) -> dict[str, int]: 1173 + return { 1174 + "total": len(results), 1175 + "passed": sum(1 for row in results if row.get("status") == "ok"), 1176 + "skipped": sum(1 for row in results if row.get("status") == "skip"), 1177 + "failed": sum(1 for row in results if row.get("ok") is False), 1178 + } 1179 + 1180 + 1181 + def record_provider_failure( 1182 + provider: str, 1183 + tier: str, 1184 + model: str, 1185 + interface: str, 1186 + reset_at_ms: int, 1187 + ) -> None: 1188 + """Record a provider/model/interface quota failure in health status.""" 1189 + health_dir = Path(get_journal()) / "health" 1190 + health_dir.mkdir(parents=True, exist_ok=True) 1191 + health_path = health_dir / "talents.json" 1192 + lock_path = health_dir / "talents.json.lock" 1193 + tmp_path = health_dir / f".talents.json.{os.getpid()}.{now_ms()}.tmp" 1194 + recorded_at = datetime.now(timezone.utc).isoformat() 1195 + message = f"Quota exhausted; retry after {reset_at_ms}" 1196 + 1197 + with open(lock_path, "w", encoding="utf-8") as lock_file: 1198 + fcntl.flock(lock_file, fcntl.LOCK_EX) 1199 + try: 1200 + try: 1201 + with open(health_path, encoding="utf-8") as health_file: 1202 + payload = json.load(health_file) 1203 + except (FileNotFoundError, json.JSONDecodeError, OSError): 1204 + payload = {} 1205 + 1206 + results = payload.get("results", []) 1207 + if not isinstance(results, list): 1208 + results = [] 1209 + failure_row = { 1210 + "provider": provider, 1211 + "tier": tier, 1212 + "model": model, 1213 + "interface": interface, 1214 + "ok": False, 1215 + "status": "quota_exhausted", 1216 + "message": message, 1217 + "elapsed_s": 0.0, 1218 + "reset_at_ms": reset_at_ms, 1219 + "recorded_at": recorded_at, 1220 + } 1221 + 1222 + for row in results: 1223 + if ( 1224 + row.get("provider") == provider 1225 + and row.get("model") == model 1226 + and row.get("interface") == interface 1227 + ): 1228 + row.update(failure_row) 1229 + break 1230 + else: 1231 + results.append(failure_row) 1232 + 1233 + payload["results"] = results 1234 + payload["summary"] = _summarize_health_results(results) 1235 + payload.setdefault("checked_at", recorded_at) 1236 + 1237 + with open(tmp_path, "w", encoding="utf-8") as tmp_file: 1238 + json.dump(payload, tmp_file, indent=2) 1239 + tmp_file.write("\n") 1240 + tmp_file.flush() 1241 + os.fsync(tmp_file.fileno()) 1242 + os.replace(tmp_path, health_path) 1243 + finally: 1244 + try: 1245 + tmp_path.unlink() 1246 + except FileNotFoundError: 1247 + pass 1248 + 1249 + 1151 1250 def should_recheck_health(health_data: Optional[dict]) -> bool: 1152 - """Check if health data is stale (>1 hour old). 1251 + """Check if health data should be rechecked. 1153 1252 1154 1253 Returns False when health_data is None or on parse errors. 1155 1254 """ 1156 1255 if health_data is None: 1157 1256 return False 1257 + failed_rows = [ 1258 + row for row in health_data.get("results", []) if row.get("ok") is False 1259 + ] 1260 + reset_values = [ 1261 + int(row["reset_at_ms"]) 1262 + for row in failed_rows 1263 + if isinstance(row.get("reset_at_ms"), (int, float)) 1264 + ] 1265 + missing_reset = len(reset_values) < len(failed_rows) 1266 + if reset_values and not missing_reset: 1267 + return now_ms() > min(reset_values) 1268 + 1158 1269 checked_at = health_data.get("checked_at") 1159 1270 if not checked_at: 1160 1271 return False
+27 -3
think/talents.py
··· 466 466 Returns: 467 467 Fully prepared config dict 468 468 """ 469 - from think.models import resolve_model_for_provider, resolve_provider 469 + from think.models import ( 470 + TIER_FLASH, 471 + TIER_LITE, 472 + TIER_PRO, 473 + _resolve_tier, 474 + resolve_model_for_provider, 475 + resolve_provider, 476 + ) 470 477 from think.talent import get_talent, key_to_context 471 478 472 479 name = request["name"] ··· 544 551 config["provider"] = provider 545 552 config["model"] = model 546 553 config["context"] = context 554 + tier = _resolve_tier(context, talent_type) 555 + config["tier"] = { 556 + TIER_PRO: "pro", 557 + TIER_FLASH: "flash", 558 + TIER_LITE: "lite", 559 + }.get(tier, str(tier)) 547 560 548 561 # --- Provider fallback: preflight swap if primary is unhealthy --- 549 562 from think.models import ( 550 563 get_backup_provider, 551 - is_provider_healthy, 564 + is_provider_model_interface_healthy, 552 565 load_health_status, 553 566 should_recheck_health, 554 567 ) ··· 557 570 health_data = load_health_status() 558 571 config["health_stale"] = should_recheck_health(health_data) 559 572 560 - if not is_provider_healthy(provider, health_data): 573 + if not is_provider_model_interface_healthy( 574 + provider, model, talent_type, health_data 575 + ): 561 576 backup = get_backup_provider(talent_type) 562 577 if backup and backup != provider: 563 578 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key") ··· 879 894 "reset_at_ms": reset_at_ms, 880 895 "terminal": False, 881 896 } 897 + ) 898 + from think.models import record_provider_failure 899 + 900 + record_provider_failure( 901 + provider, 902 + config["tier"], 903 + config["model"], 904 + config["type"], 905 + reset_at_ms, 882 906 ) 883 907 from think.models import ( 884 908 get_backup_provider,
+4 -4
think/top.py
··· 74 74 self.think_last_completed = {} # Last think/completed event 75 75 self.think_running = False # Whether a think run is active 76 76 77 - # Agents health tracking (from health/agents.json file) 78 - self.agents_health = None # Parsed agents.json dict, or None 77 + # Agents health tracking (from health/talents.json file) 78 + self.agents_health = None # Parsed talents.json dict, or None 79 79 self.agents_health_ts = 0.0 # Last time health file was read 80 80 self.AGENTS_HEALTH_INTERVAL = 30 # Seconds between file re-reads 81 81 ··· 762 762 return self.displayed_mode 763 763 764 764 def _load_agents_health(self) -> None: 765 - """Read and cache health/agents.json from the journal.""" 765 + """Read and cache health/talents.json from the journal.""" 766 766 self.agents_health_ts = time.time() 767 767 try: 768 - path = Path(get_journal()) / "health" / "agents.json" 768 + path = Path(get_journal()) / "health" / "talents.json" 769 769 self.agents_health = json.loads(path.read_text()) 770 770 except (FileNotFoundError, json.JSONDecodeError, OSError): 771 771 self.agents_health = None