personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

convey/chat: rebuild active talents on hydrate from chat stream

When convey restarts between talent_spawned and the matching
finish/error, walk today's chat events to repopulate _active_talents
from spawned-without-terminal entries. Late cortex events for those
use_ids resume the parent chat instead of falling through as
unrouteable.

Co-Authored-By: OpenAI Codex <codex@openai.com>

+182
+53
convey/chat.py
··· 305 305 notes=parsed["notes"], 306 306 requested_target=requested_target, 307 307 requested_task=requested_task, 308 + app=_current_chat_state["location"]["app"], 309 + path=_current_chat_state["location"]["path"], 310 + facet=_current_chat_state["location"]["facet"], 308 311 ) 309 312 _current_chat_state["retry_count"] = 0 310 313 _set_current_raw_use_locked(logical_use_id, None) ··· 591 594 _run_next_action(next_info) 592 595 593 596 597 + def _recover_active_talents_locked(day: str) -> None: 598 + events = read_chat_events(day) 599 + latest_sol_message: dict[str, Any] | None = None 600 + spawned: dict[str, dict[str, Any]] = {} 601 + 602 + for event in events: 603 + kind = event.get("kind") 604 + if kind == "sol_message": 605 + latest_sol_message = event 606 + continue 607 + if kind == "talent_spawned": 608 + use_id = str(event.get("use_id") or "") 609 + if not use_id: 610 + continue 611 + if latest_sol_message is None: 612 + logger.warning( 613 + "skipping active-talent recovery for %s: no parent sol_message", 614 + use_id, 615 + ) 616 + continue 617 + chat_use_id = str(latest_sol_message.get("use_id") or "") 618 + if not chat_use_id: 619 + logger.warning( 620 + "skipping active-talent recovery for %s: no parent sol_message", 621 + use_id, 622 + ) 623 + continue 624 + spawned[use_id] = { 625 + "chat_use_id": chat_use_id, 626 + "target": str(event.get("name") or ""), 627 + "task": str(event.get("task") or ""), 628 + "location": _normalize_location( 629 + latest_sol_message.get("app"), 630 + latest_sol_message.get("path"), 631 + latest_sol_message.get("facet"), 632 + ), 633 + } 634 + continue 635 + if kind in {"talent_finished", "talent_errored"}: 636 + spawned.pop(str(event.get("use_id") or ""), None) 637 + 638 + for use_id, state in spawned.items(): 639 + if use_id in _active_talents: 640 + continue 641 + _active_talents[use_id] = state 642 + if use_id not in _watchdog_timers: 643 + _arm_watchdog_locked(use_id, "talent", state["chat_use_id"]) 644 + 645 + 594 646 def _recover_chat_if_needed() -> None: 595 647 day = _today_day() 596 648 start_info: dict[str, Any] | None = None 597 649 598 650 with _state_lock: 651 + _recover_active_talents_locked(day) 599 652 if _current_chat_use_id is not None: 600 653 return 601 654 unresolved = find_unresponded_trigger(day)
+129
tests/test_chat_runtime.py
··· 3 3 4 4 from __future__ import annotations 5 5 6 + from datetime import datetime 7 + 6 8 import pytest 7 9 from flask import Flask 8 10 ··· 29 31 return journal 30 32 31 33 34 + def _ms(year: int, month: int, day: int, hour: int, minute: int, second: int) -> int: 35 + return int(datetime(year, month, day, hour, minute, second).timestamp() * 1000) 36 + 37 + 32 38 def _install_fake_timers(monkeypatch): 33 39 timers: list[FakeTimer] = [] 34 40 ··· 56 62 57 63 monkeypatch.setattr("convey.chat.threading.Timer", FakeTimer) 58 64 return timers 65 + 66 + 67 + def _append_recoverable_talent_events( 68 + chat_use_id: str, 69 + talent_use_id: str, 70 + *, 71 + target: str = "exec", 72 + task: str = "research it", 73 + ) -> None: 74 + now = datetime.now() 75 + start = _ms(now.year, now.month, now.day, 12, 0, 0) 76 + append_chat_event( 77 + "sol_message", 78 + ts=start, 79 + use_id=chat_use_id, 80 + text="I am looking into that.", 81 + notes="need exec", 82 + requested_target=target, 83 + requested_task=task, 84 + app="home", 85 + path="/app/home", 86 + facet="work", 87 + ) 88 + append_chat_event( 89 + "talent_spawned", 90 + ts=start + 1_000, 91 + use_id=talent_use_id, 92 + name=target, 93 + task=task, 94 + started_at=start + 1_000, 95 + ) 59 96 60 97 61 98 def test_chat_result_with_two_active_talents_retriggers_with_max_active_reason( ··· 284 321 chat.start_chat_runtime(app) 285 322 286 323 assert len(starts) == 1 324 + 325 + 326 + def test_recover_active_talents_repopulates_from_chat_stream(tmp_path, monkeypatch): 327 + import convey.chat as chat 328 + 329 + _setup_journal(tmp_path, monkeypatch) 330 + _reset_chat_state(chat) 331 + timers = _install_fake_timers(monkeypatch) 332 + day = datetime.now().strftime("%Y%m%d") 333 + monkeypatch.setattr("convey.chat._today_day", lambda: day) 334 + 335 + chat_use_id = "1713624500000" 336 + talent_use_id = "1713624500001" 337 + _append_recoverable_talent_events(chat_use_id, talent_use_id) 338 + 339 + chat._recover_chat_if_needed() 340 + 341 + with chat._state_lock: 342 + assert chat._active_talents[talent_use_id] == { 343 + "chat_use_id": chat_use_id, 344 + "target": "exec", 345 + "task": "research it", 346 + "location": {"app": "home", "path": "/app/home", "facet": "work"}, 347 + } 348 + assert talent_use_id in chat._watchdog_timers 349 + assert len(timers) == 1 350 + 351 + 352 + def test_late_talent_finish_after_recovery_routes_to_chat_continuation( 353 + tmp_path, monkeypatch, caplog 354 + ): 355 + import convey.chat as chat 356 + 357 + _setup_journal(tmp_path, monkeypatch) 358 + _reset_chat_state(chat) 359 + _install_fake_timers(monkeypatch) 360 + day = datetime.now().strftime("%Y%m%d") 361 + monkeypatch.setattr("convey.chat._today_day", lambda: day) 362 + 363 + chat_use_id = "1713624600000" 364 + talent_use_id = "1713624600001" 365 + _append_recoverable_talent_events(chat_use_id, talent_use_id) 366 + chat._recover_chat_if_needed() 367 + 368 + actions: list[dict | None] = [] 369 + monkeypatch.setattr( 370 + "convey.chat._run_next_action", lambda action: actions.append(action) 371 + ) 372 + monkeypatch.setattr("convey.chat._emit_finish", lambda *args, **kwargs: None) 373 + monkeypatch.setattr("convey.chat._emit_error", lambda *args, **kwargs: None) 374 + 375 + with chat._state_lock: 376 + chat._current_chat_use_id = chat_use_id 377 + chat._current_chat_state = { 378 + "raw_use_id": None, 379 + "trigger": {"type": "owner_message", "message": "help"}, 380 + "location": {"app": "home", "path": "/app/home", "facet": "work"}, 381 + "retry_count": 0, 382 + } 383 + 384 + with caplog.at_level("WARNING"): 385 + chat._on_cortex_finish({"use_id": talent_use_id, "result": "done"}) 386 + 387 + assert "unrouteable cortex event" not in caplog.text 388 + finished_events = [ 389 + e for e in read_chat_events(chat._today_day()) if e["kind"] == "talent_finished" 390 + ] 391 + assert finished_events[-1]["use_id"] == talent_use_id 392 + assert actions[-1]["logical_use_id"] == chat_use_id 393 + assert actions[-1]["trigger"]["type"] == "talent_finished" 394 + 395 + 396 + def test_recovery_is_idempotent_for_active_talents(tmp_path, monkeypatch): 397 + import convey.chat as chat 398 + 399 + _setup_journal(tmp_path, monkeypatch) 400 + _reset_chat_state(chat) 401 + timers = _install_fake_timers(monkeypatch) 402 + day = datetime.now().strftime("%Y%m%d") 403 + monkeypatch.setattr("convey.chat._today_day", lambda: day) 404 + 405 + chat_use_id = "1713624700000" 406 + talent_use_id = "1713624700001" 407 + _append_recoverable_talent_events(chat_use_id, talent_use_id) 408 + 409 + chat._recover_chat_if_needed() 410 + chat._recover_chat_if_needed() 411 + 412 + with chat._state_lock: 413 + assert list(chat._active_talents) == [talent_use_id] 414 + assert talent_use_id in chat._watchdog_timers 415 + assert len(timers) == 1 287 416 288 417 289 418 def test_chat_generate_schema_violation_retries_once_then_chat_errors(