personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

convey/chat: add singleton watchdogs and routing warnings

Arm per-use watchdog timers for active chat requests and spawned talents, and cancel them before routed state clears so slow cortex work cannot wedge the singleton. Also log unrouteable finish/error events and cover the real exec dispatch path plus timeout behavior in chat runtime tests.

+392 -7
+142 -7
convey/chat.py
··· 37 37 MAX_LOOP_RETRIES = 3 38 38 DEFAULT_STREAM_LIMIT = 200 39 39 MAX_STREAM_LIMIT = 1000 40 + _CHAT_WATCHDOG_SECONDS = 180 40 41 MAX_ACTIVE_REASON = "max active — waiting for one to finish" 41 42 CHAT_TROUBLE_REASON = "chat had trouble — try again" 43 + CHAT_WATCHDOG_REASON = "chat took too long — try again" 42 44 43 45 _DAY_RE = re.compile(r"^\d{8}$") 44 46 _state_lock = threading.Lock() ··· 47 49 _current_chat_state: dict[str, Any] | None = None 48 50 _queued_trigger: dict[str, Any] | None = None 49 51 _active_talents: dict[str, dict[str, Any]] = {} 52 + _watchdog_timers: dict[str, threading.Timer] = {} 50 53 _last_use_id = 0 51 54 _runtime: "ChatRuntimeState | None" = None 52 55 _atexit_registered = False ··· 184 187 """Stop the shared runtime.""" 185 188 global _runtime 186 189 190 + with _state_lock: 191 + for timer in _watchdog_timers.values(): 192 + timer.cancel() 193 + _watchdog_timers.clear() 194 + 187 195 with _runtime_lock: 188 196 runtime = _runtime 189 197 _runtime = None ··· 256 264 "raw_use_id" 257 265 ): 258 266 logical_use_id = str(_current_chat_use_id) 267 + _cancel_watchdog_locked(use_id) 259 268 try: 260 269 parsed = _parse_chat_result(message.get("result")) 261 270 except ValueError: 262 271 if int(_current_chat_state.get("retry_count", 0) or 0) < 1: 263 272 retry_use_id = _reserve_use_id_locked() 264 - _current_chat_state["raw_use_id"] = retry_use_id 273 + _set_current_raw_use_locked(logical_use_id, retry_use_id) 265 274 _current_chat_state["retry_count"] = ( 266 275 int(_current_chat_state.get("retry_count", 0) or 0) + 1 267 276 ) ··· 298 307 requested_task=requested_task, 299 308 ) 300 309 _current_chat_state["retry_count"] = 0 301 - _current_chat_state["raw_use_id"] = None 310 + _set_current_raw_use_locked(logical_use_id, None) 302 311 if requested_target: 303 312 active_talent_count = _active_talent_count_for_today_locked() 304 313 if active_talent_count >= MAX_ACTIVE_TALENTS: ··· 307 316 "reason": MAX_ACTIVE_REASON, 308 317 } 309 318 synthetic_use_id = _reserve_use_id_locked() 310 - _current_chat_state["raw_use_id"] = synthetic_use_id 319 + _set_current_raw_use_locked(logical_use_id, synthetic_use_id) 311 320 next_info = _build_spawn_info_locked(logical_use_id) 312 321 elif _talent_loop_count_locked() >= MAX_LOOP_RETRIES: 313 322 append_chat_event( ··· 363 372 next_info = _clear_current_locked() 364 373 365 374 elif use_id in _active_talents: 375 + _cancel_watchdog_locked(use_id) 366 376 talent_state = _active_talents.pop(use_id) 367 377 logical_use_id = str(talent_state["chat_use_id"]) 368 378 summary = str(message.get("result") or "").strip() ··· 382 392 "name": str(talent_state["target"]), 383 393 "summary": summary, 384 394 } 385 - _current_chat_state["raw_use_id"] = _reserve_use_id_locked() 395 + _set_current_raw_use_locked( 396 + logical_use_id, 397 + _reserve_use_id_locked(), 398 + ) 386 399 _current_chat_state["retry_count"] = 0 387 400 next_info = _build_spawn_info_locked(logical_use_id) 401 + else: 402 + logger.warning( 403 + "unrouteable cortex event use_id=%s event=%s reason=%s", 404 + use_id, 405 + "finish", 406 + "no matching active chat-generate or talent", 407 + ) 388 408 389 409 _run_next_action(next_info) 390 410 if finish_payload is not None: ··· 406 426 "raw_use_id" 407 427 ): 408 428 logical_use_id = str(_current_chat_use_id) 429 + _cancel_watchdog_locked(use_id) 409 430 append_chat_event( 410 431 "chat_error", 411 432 reason=CHAT_TROUBLE_REASON, ··· 414 435 error_payload = {"use_id": logical_use_id, "reason": CHAT_TROUBLE_REASON} 415 436 next_info = _clear_current_locked() 416 437 elif use_id in _active_talents: 438 + _cancel_watchdog_locked(use_id) 417 439 talent_state = _active_talents.pop(use_id) 418 440 logical_use_id = str(talent_state["chat_use_id"]) 419 441 reason = str(message.get("error") or CHAT_TROUBLE_REASON) ··· 433 455 "name": str(talent_state["target"]), 434 456 "reason": reason, 435 457 } 436 - _current_chat_state["raw_use_id"] = _reserve_use_id_locked() 458 + _set_current_raw_use_locked( 459 + logical_use_id, 460 + _reserve_use_id_locked(), 461 + ) 437 462 _current_chat_state["retry_count"] = 0 438 463 next_info = _build_spawn_info_locked(logical_use_id) 464 + else: 465 + logger.warning( 466 + "unrouteable cortex event use_id=%s event=%s reason=%s", 467 + use_id, 468 + "error", 469 + "no matching active chat-generate or talent", 470 + ) 439 471 440 472 _run_next_action(next_info) 441 473 if error_payload is not None: ··· 452 484 if action.get("kind") == "talent": 453 485 if not _spawn_talent(action): 454 486 _handle_talent_spawn_failure(action) 487 + return 488 + with _state_lock: 489 + _arm_watchdog_locked( 490 + str(action["use_id"]), 491 + "talent", 492 + str(action["logical_use_id"]), 493 + ) 455 494 456 495 457 496 def _spawn_chat_generate(action: dict[str, Any]) -> bool: ··· 514 553 def _handle_talent_spawn_failure(action: dict[str, Any]) -> None: 515 554 next_info: dict[str, Any] | None = None 516 555 with _state_lock: 556 + _cancel_watchdog_locked(str(action["use_id"])) 517 557 _active_talents.pop(str(action["use_id"]), None) 518 558 append_chat_event( 519 559 "talent_errored", ··· 528 568 "name": action["target"], 529 569 "reason": CHAT_TROUBLE_REASON, 530 570 } 531 - _current_chat_state["raw_use_id"] = _reserve_use_id_locked() 571 + _set_current_raw_use_locked( 572 + str(action["logical_use_id"]), 573 + _reserve_use_id_locked(), 574 + ) 532 575 _current_chat_state["retry_count"] = 0 533 576 next_info = _build_spawn_info_locked(action["logical_use_id"]) 534 577 _run_next_action(next_info) ··· 539 582 with _state_lock: 540 583 append_chat_event("chat_error", reason=reason, use_id=logical_use_id) 541 584 if _current_chat_use_id == logical_use_id: 585 + if _current_chat_state is not None: 586 + _cancel_watchdog_locked( 587 + str(_current_chat_state.get("raw_use_id") or "") 588 + ) 542 589 next_info = _clear_current_locked() 543 590 _emit_error(logical_use_id, reason) 544 591 _run_next_action(next_info) ··· 573 620 raw_use_id = _reserve_use_id_locked() 574 621 _current_chat_use_id = logical_use_id 575 622 _current_chat_state = { 576 - "raw_use_id": raw_use_id, 623 + "raw_use_id": None, 577 624 "trigger": dict(trigger), 578 625 "location": dict(location), 579 626 "retry_count": 0, 580 627 } 628 + _set_current_raw_use_locked(logical_use_id, raw_use_id) 581 629 return _build_spawn_info_locked(logical_use_id) 582 630 583 631 ··· 618 666 dict(queued["trigger"]), 619 667 dict(queued["location"]), 620 668 ) 669 + 670 + 671 + def _arm_watchdog_locked(use_id: str, kind: str, logical_use_id: str) -> None: 672 + _cancel_watchdog_locked(use_id) 673 + timer = threading.Timer( 674 + _CHAT_WATCHDOG_SECONDS, 675 + _on_watchdog_timeout, 676 + args=(use_id, kind, logical_use_id), 677 + ) 678 + timer.daemon = True 679 + _watchdog_timers[use_id] = timer 680 + timer.start() 681 + 682 + 683 + def _cancel_watchdog_locked(use_id: str | None) -> None: 684 + if not use_id: 685 + return 686 + timer = _watchdog_timers.pop(str(use_id), None) 687 + if timer is not None: 688 + timer.cancel() 689 + 690 + 691 + def _set_current_raw_use_locked(logical_use_id: str, raw_use_id: str | None) -> None: 692 + assert _current_chat_state is not None 693 + _cancel_watchdog_locked(str(_current_chat_state.get("raw_use_id") or "")) 694 + _current_chat_state["raw_use_id"] = raw_use_id 695 + if raw_use_id is not None: 696 + _arm_watchdog_locked(str(raw_use_id), "chat", logical_use_id) 697 + 698 + 699 + def _on_watchdog_timeout(use_id: str, kind: str, logical_use_id: str) -> None: 700 + next_info: dict[str, Any] | None = None 701 + should_emit = False 702 + 703 + with _state_lock: 704 + _watchdog_timers.pop(use_id, None) 705 + 706 + if kind == "chat": 707 + if _current_chat_use_id != logical_use_id or _current_chat_state is None: 708 + return 709 + if str(_current_chat_state.get("raw_use_id") or "") != use_id: 710 + return 711 + logger.warning( 712 + "chat watchdog timed out use_id=%s kind=%s logical_use_id=%s", 713 + use_id, 714 + kind, 715 + logical_use_id, 716 + ) 717 + append_chat_event( 718 + "chat_error", 719 + reason=CHAT_WATCHDOG_REASON, 720 + use_id=logical_use_id, 721 + ) 722 + next_info = _clear_current_locked() 723 + should_emit = True 724 + elif kind == "talent": 725 + talent_state = _active_talents.get(use_id) 726 + if ( 727 + talent_state is None 728 + or str(talent_state.get("chat_use_id")) != logical_use_id 729 + ): 730 + return 731 + logger.warning( 732 + "chat watchdog timed out use_id=%s kind=%s logical_use_id=%s", 733 + use_id, 734 + kind, 735 + logical_use_id, 736 + ) 737 + _active_talents.pop(use_id, None) 738 + append_chat_event( 739 + "chat_error", 740 + reason=CHAT_WATCHDOG_REASON, 741 + use_id=logical_use_id, 742 + ) 743 + if ( 744 + _current_chat_use_id == logical_use_id 745 + and _current_chat_state is not None 746 + and not _current_chat_state.get("raw_use_id") 747 + ): 748 + next_info = _clear_current_locked() 749 + should_emit = True 750 + else: 751 + return 752 + 753 + if should_emit: 754 + _emit_error(logical_use_id, CHAT_WATCHDOG_REASON) 755 + _run_next_action(next_info) 621 756 622 757 623 758 def _active_talent_count_for_today_locked() -> int:
+250
tests/test_chat_runtime.py
··· 16 16 chat_module._current_chat_state = None 17 17 chat_module._queued_trigger = None 18 18 chat_module._active_talents.clear() 19 + for timer in chat_module._watchdog_timers.values(): 20 + timer.cancel() 21 + chat_module._watchdog_timers.clear() 19 22 chat_module._last_use_id = 0 20 23 21 24 ··· 24 27 journal.mkdir() 25 28 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 26 29 return journal 30 + 31 + 32 + def _install_fake_timers(monkeypatch): 33 + timers: list[FakeTimer] = [] 34 + 35 + class FakeTimer: 36 + def __init__(self, interval, function, args=None, kwargs=None): 37 + self.interval = interval 38 + self.function = function 39 + self.args = tuple(args or ()) 40 + self.kwargs = dict(kwargs or {}) 41 + self.started = False 42 + self.cancelled = False 43 + self.daemon = False 44 + timers.append(self) 45 + 46 + def start(self) -> None: 47 + self.started = True 48 + 49 + def cancel(self) -> None: 50 + self.cancelled = True 51 + 52 + def fire(self) -> None: 53 + if self.cancelled: 54 + return 55 + self.function(*self.args, **self.kwargs) 56 + 57 + monkeypatch.setattr("convey.chat.threading.Timer", FakeTimer) 58 + return timers 27 59 28 60 29 61 def test_chat_result_with_two_active_talents_retriggers_with_max_active_reason( ··· 298 330 e for e in read_chat_events(chat._today_day()) if e["kind"] == "chat_error" 299 331 ] 300 332 assert errors[-1]["use_id"] == "1713625000000" 333 + 334 + 335 + def test_exec_dispatch_appends_sol_message_and_spawns_talent_real_path( 336 + tmp_path, monkeypatch 337 + ): 338 + import convey.chat as chat 339 + 340 + _setup_journal(tmp_path, monkeypatch) 341 + _reset_chat_state(chat) 342 + timers = _install_fake_timers(monkeypatch) 343 + 344 + spawn_calls: list[dict[str, object]] = [] 345 + monkeypatch.setattr("convey.chat._emit_cortex_event", lambda *args, **kwargs: None) 346 + 347 + def fake_spawn_agent(prompt, name, provider=None, config=None, use_id=None): 348 + spawn_calls.append( 349 + { 350 + "prompt": prompt, 351 + "name": name, 352 + "provider": provider, 353 + "config": config, 354 + "use_id": use_id, 355 + } 356 + ) 357 + return use_id 358 + 359 + monkeypatch.setattr("convey.utils.spawn_agent", fake_spawn_agent) 360 + 361 + with chat._state_lock: 362 + start_info = chat._activate_current_locked( 363 + "1713625500000", 364 + {"type": "owner_message", "message": "help"}, 365 + {"app": "sol", "path": "/app/sol", "facet": "work"}, 366 + ) 367 + 368 + raw_use_id = start_info["raw_use_id"] 369 + chat._on_cortex_finish( 370 + { 371 + "use_id": raw_use_id, 372 + "result": ( 373 + '{"message":"I am looking into that.","notes":"need exec",' 374 + '"talent_request":{"target":"exec","task":"research it",' 375 + '"context":{"k":"v"}}}' 376 + ), 377 + } 378 + ) 379 + 380 + events = read_chat_events(chat._today_day()) 381 + sol_messages = [event for event in events if event["kind"] == "sol_message"] 382 + spawned_events = [event for event in events if event["kind"] == "talent_spawned"] 383 + 384 + assert sol_messages[-1]["text"] == "I am looking into that." 385 + assert sol_messages[-1]["requested_target"] == "exec" 386 + assert sol_messages[-1]["requested_task"] == "research it" 387 + assert spawned_events[-1]["name"] == "exec" 388 + assert spawned_events[-1]["task"] == "research it" 389 + assert spawn_calls == [ 390 + { 391 + "prompt": "Task: research it\n\nContext hints:\n{'k': 'v'}\n\n" 392 + "Location: app=sol path=/app/sol facet=work\n\n" 393 + "Recent chat:\n**Sol**: I am looking into that.", 394 + "name": "exec", 395 + "provider": None, 396 + "config": { 397 + "app": "sol", 398 + "path": "/app/sol", 399 + "facet": "work", 400 + "chat_parent_use_id": "1713625500000", 401 + }, 402 + "use_id": spawned_events[-1]["use_id"], 403 + } 404 + ] 405 + assert len(timers) == 2 406 + assert timers[0].cancelled is True 407 + with chat._state_lock: 408 + assert spawned_events[-1]["use_id"] in chat._active_talents 409 + 410 + 411 + def test_chat_watchdog_times_out_current_chat_generate(tmp_path, monkeypatch): 412 + import convey.chat as chat 413 + 414 + _setup_journal(tmp_path, monkeypatch) 415 + _reset_chat_state(chat) 416 + timers = _install_fake_timers(monkeypatch) 417 + 418 + emitted_errors: list[tuple[str, str]] = [] 419 + run_actions: list[dict | None] = [] 420 + monkeypatch.setattr( 421 + "convey.chat._emit_error", 422 + lambda use_id, reason: emitted_errors.append((use_id, reason)), 423 + ) 424 + monkeypatch.setattr( 425 + "convey.chat._run_next_action", lambda action: run_actions.append(action) 426 + ) 427 + 428 + with chat._state_lock: 429 + start_info = chat._activate_current_locked( 430 + "1713628000000", 431 + {"type": "owner_message", "message": "help"}, 432 + {"app": "sol", "path": "/app/sol", "facet": "work"}, 433 + ) 434 + 435 + raw_use_id = start_info["raw_use_id"] 436 + assert raw_use_id in chat._watchdog_timers 437 + 438 + timers[-1].fire() 439 + 440 + errors = [ 441 + event 442 + for event in read_chat_events(chat._today_day()) 443 + if event["kind"] == "chat_error" 444 + ] 445 + assert emitted_errors == [("1713628000000", "chat took too long — try again")] 446 + assert run_actions == [None] 447 + assert errors[-1]["use_id"] == "1713628000000" 448 + assert errors[-1]["reason"] == "chat took too long — try again" 449 + with chat._state_lock: 450 + assert chat._current_chat_use_id is None 451 + assert chat._current_chat_state is None 452 + assert raw_use_id not in chat._watchdog_timers 453 + 454 + 455 + def test_chat_watchdog_times_out_active_talent_and_clears_blocked_chat( 456 + tmp_path, monkeypatch 457 + ): 458 + import convey.chat as chat 459 + 460 + _setup_journal(tmp_path, monkeypatch) 461 + _reset_chat_state(chat) 462 + timers = _install_fake_timers(monkeypatch) 463 + 464 + emitted_errors: list[tuple[str, str]] = [] 465 + monkeypatch.setattr("convey.chat._emit_cortex_event", lambda *args, **kwargs: None) 466 + monkeypatch.setattr( 467 + "convey.chat._emit_error", 468 + lambda use_id, reason: emitted_errors.append((use_id, reason)), 469 + ) 470 + monkeypatch.setattr( 471 + "convey.utils.spawn_agent", lambda *args, **kwargs: kwargs["use_id"] 472 + ) 473 + 474 + with chat._state_lock: 475 + chat._current_chat_use_id = "1713629000000" 476 + chat._current_chat_state = { 477 + "raw_use_id": None, 478 + "trigger": {"type": "owner_message", "message": "help"}, 479 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 480 + "retry_count": 0, 481 + } 482 + chat._active_talents["1713629000001"] = { 483 + "chat_use_id": "1713629000000", 484 + "target": "exec", 485 + "task": "summarize", 486 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 487 + } 488 + 489 + chat._run_next_action( 490 + { 491 + "kind": "talent", 492 + "logical_use_id": "1713629000000", 493 + "target": "exec", 494 + "use_id": "1713629000001", 495 + "task": "summarize", 496 + "context": {}, 497 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 498 + } 499 + ) 500 + 501 + assert "1713629000001" in chat._watchdog_timers 502 + timers[-1].fire() 503 + 504 + errors = [ 505 + event 506 + for event in read_chat_events(chat._today_day()) 507 + if event["kind"] == "chat_error" 508 + ] 509 + assert emitted_errors == [("1713629000000", "chat took too long — try again")] 510 + assert errors[-1]["use_id"] == "1713629000000" 511 + assert errors[-1]["reason"] == "chat took too long — try again" 512 + with chat._state_lock: 513 + assert "1713629000001" not in chat._active_talents 514 + assert chat._current_chat_use_id is None 515 + assert chat._current_chat_state is None 516 + assert "1713629000001" not in chat._watchdog_timers 517 + 518 + 519 + def test_cortex_finish_logs_warning_for_unrouteable_use_id( 520 + tmp_path, monkeypatch, caplog 521 + ): 522 + import convey.chat as chat 523 + 524 + _setup_journal(tmp_path, monkeypatch) 525 + _reset_chat_state(chat) 526 + 527 + with caplog.at_level("WARNING"): 528 + chat._on_cortex_finish({"use_id": "1713630000000", "result": "done"}) 529 + 530 + assert ( 531 + "unrouteable cortex event use_id=1713630000000 event=finish " 532 + "reason=no matching active chat-generate or talent" 533 + ) in caplog.text 534 + 535 + 536 + def test_cortex_error_logs_warning_for_unrouteable_use_id( 537 + tmp_path, monkeypatch, caplog 538 + ): 539 + import convey.chat as chat 540 + 541 + _setup_journal(tmp_path, monkeypatch) 542 + _reset_chat_state(chat) 543 + 544 + with caplog.at_level("WARNING"): 545 + chat._on_cortex_error({"use_id": "1713631000000", "error": "boom"}) 546 + 547 + assert ( 548 + "unrouteable cortex event use_id=1713631000000 event=error " 549 + "reason=no matching active chat-generate or talent" 550 + ) in caplog.text 301 551 302 552 303 553 def test_parse_chat_result_accepts_reflection_target():