personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

convey/chat: drop superseded raw cortex events and gate runtime to reloader child

Stale cortex finish/error events for a rotated raw_use_id were falling into the unrouteable warning path while the owner's chat turn was still in flight, leaving the owner without a sol_message or chat_error. Track raw_use_ids per turn and silently drop superseded deliveries; narrow the warning to genuinely unknown ids. Also skip runtime startup in the Werkzeug reloader parent so debug mode doesn't get duplicate listeners.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+230
+39
convey/chat.py
··· 9 9 import atexit 10 10 import json 11 11 import logging 12 + import os 12 13 import pprint 13 14 import re 14 15 import threading ··· 153 154 """Start the chat backend runtime and subscribe to cortex events.""" 154 155 global _runtime, _atexit_registered 155 156 157 + if app.debug and os.environ.get("WERKZEUG_RUN_MAIN") != "true": 158 + logger.info("skipping chat runtime startup in Werkzeug reloader parent") 159 + app.chat_runtime_started = False 160 + return 161 + 156 162 with _runtime_lock: 157 163 if _runtime is None: 158 164 runtime = ChatRuntimeState(callosum=CallosumConnection()) ··· 238 244 elif use_id in _active_talents: 239 245 logical_use_id = str(_active_talents[use_id]["chat_use_id"]) 240 246 _refresh_watchdog_locked(use_id, "talent", logical_use_id) 247 + elif _is_superseded_raw_use_id_locked(use_id): 248 + logger.debug( 249 + "superseded raw cortex event use_id=%s event=%s reason=%s", 250 + use_id, 251 + str(message.get("event") or "progress"), 252 + "raw rotated", 253 + ) 241 254 242 255 if logical_use_id is None: 243 256 return ··· 400 413 ) 401 414 _current_chat_state["retry_count"] = 0 402 415 next_info = _build_spawn_info_locked(logical_use_id) 416 + elif _is_superseded_raw_use_id_locked(use_id): 417 + logger.debug( 418 + "superseded raw cortex event use_id=%s event=%s reason=%s", 419 + use_id, 420 + "finish", 421 + "raw rotated", 422 + ) 403 423 else: 404 424 logger.warning( 405 425 "unrouteable cortex event use_id=%s event=%s reason=%s", ··· 463 483 ) 464 484 _current_chat_state["retry_count"] = 0 465 485 next_info = _build_spawn_info_locked(logical_use_id) 486 + elif _is_superseded_raw_use_id_locked(use_id): 487 + logger.debug( 488 + "superseded raw cortex event use_id=%s event=%s reason=%s", 489 + use_id, 490 + "error", 491 + "raw rotated", 492 + ) 466 493 else: 467 494 logger.warning( 468 495 "unrouteable cortex event use_id=%s event=%s reason=%s", ··· 677 704 _current_chat_use_id = logical_use_id 678 705 _current_chat_state = { 679 706 "raw_use_id": None, 707 + "raw_use_ids_seen": set(), 680 708 "trigger": dict(trigger), 681 709 "location": dict(location), 682 710 "retry_count": 0, ··· 753 781 def _set_current_raw_use_locked(logical_use_id: str, raw_use_id: str | None) -> None: 754 782 assert _current_chat_state is not None 755 783 _cancel_watchdog_locked(str(_current_chat_state.get("raw_use_id") or "")) 784 + if raw_use_id is not None: 785 + _current_chat_state["raw_use_ids_seen"].add(str(raw_use_id)) 756 786 _current_chat_state["raw_use_id"] = raw_use_id 757 787 if raw_use_id is not None: 758 788 _arm_watchdog_locked(str(raw_use_id), "chat", logical_use_id) 789 + 790 + 791 + def _is_superseded_raw_use_id_locked(use_id: str) -> bool: 792 + if _current_chat_state is None: 793 + return False 794 + raw_chat_use_id = str(_current_chat_state.get("raw_use_id") or "") 795 + if use_id == raw_chat_use_id: 796 + return False 797 + return use_id in _current_chat_state["raw_use_ids_seen"] 759 798 760 799 761 800 def _on_watchdog_timeout(use_id: str, kind: str, logical_use_id: str) -> None:
+191
tests/test_chat_runtime.py
··· 134 134 chat._current_chat_use_id = "1713620000100" 135 135 chat._current_chat_state = { 136 136 "raw_use_id": "1713620000101", 137 + "raw_use_ids_seen": {"1713620000101"}, 137 138 "trigger": {"type": "owner_message", "message": "help"}, 138 139 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 139 140 "retry_count": 0, ··· 210 211 chat._current_chat_use_id = "1713621999999" 211 212 chat._current_chat_state = { 212 213 "raw_use_id": "1713622000000", 214 + "raw_use_ids_seen": {"1713622000000"}, 213 215 "trigger": {"type": "talent_finished", "summary": "summary 2"}, 214 216 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 215 217 "retry_count": 0, ··· 252 254 chat._current_chat_use_id = "1713623000000" 253 255 chat._current_chat_state = { 254 256 "raw_use_id": None, 257 + "raw_use_ids_seen": set(), 255 258 "trigger": {"type": "owner_message", "message": "help"}, 256 259 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 257 260 "retry_count": 0, ··· 276 279 chat._current_chat_use_id = "1713624000000" 277 280 chat._current_chat_state = { 278 281 "raw_use_id": None, 282 + "raw_use_ids_seen": set(), 279 283 "trigger": {"type": "owner_message", "message": "help"}, 280 284 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 281 285 "retry_count": 0, ··· 328 332 assert len(starts) == 1 329 333 330 334 335 + def test_start_chat_runtime_skips_debug_reloader_parent(tmp_path, monkeypatch, caplog): 336 + import convey.chat as chat 337 + 338 + _setup_journal(tmp_path, monkeypatch) 339 + _reset_chat_state(chat) 340 + 341 + starts: list[object] = [] 342 + monkeypatch.setattr( 343 + "convey.chat.CallosumConnection.start", 344 + lambda self, callback=None: starts.append(callback), 345 + ) 346 + monkeypatch.setattr("convey.chat.CallosumConnection.stop", lambda self: None) 347 + 348 + app = Flask(__name__) 349 + app.debug = True 350 + 351 + monkeypatch.delenv("WERKZEUG_RUN_MAIN", raising=False) 352 + with caplog.at_level("INFO"): 353 + chat.start_chat_runtime(app) 354 + 355 + assert chat._runtime is None 356 + assert starts == [] 357 + assert app.chat_runtime_started is False 358 + assert "skipping chat runtime startup in Werkzeug reloader parent" in caplog.text 359 + 360 + monkeypatch.setenv("WERKZEUG_RUN_MAIN", "true") 361 + chat.start_chat_runtime(app) 362 + 363 + assert chat._runtime is not None 364 + assert len(starts) == 1 365 + assert app.chat_runtime_started is True 366 + 367 + 331 368 def test_recover_active_talents_repopulates_from_chat_stream(tmp_path, monkeypatch): 332 369 import convey.chat as chat 333 370 ··· 381 418 chat._current_chat_use_id = chat_use_id 382 419 chat._current_chat_state = { 383 420 "raw_use_id": None, 421 + "raw_use_ids_seen": set(), 384 422 "trigger": {"type": "owner_message", "message": "help"}, 385 423 "location": {"app": "home", "path": "/app/home", "facet": "work"}, 386 424 "retry_count": 0, ··· 443 481 chat._current_chat_use_id = "1713625000000" 444 482 chat._current_chat_state = { 445 483 "raw_use_id": "1713625000001", 484 + "raw_use_ids_seen": {"1713625000001"}, 446 485 "trigger": {"type": "owner_message", "message": "help"}, 447 486 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 448 487 "retry_count": 0, ··· 466 505 assert errors[-1]["use_id"] == "1713625000000" 467 506 468 507 508 + def test_superseded_raw_finish_after_retry_is_dropped_without_warning( 509 + tmp_path, monkeypatch, caplog 510 + ): 511 + import convey.chat as chat 512 + 513 + _setup_journal(tmp_path, monkeypatch) 514 + _reset_chat_state(chat) 515 + 516 + actions: list[dict | None] = [] 517 + monkeypatch.setattr( 518 + "convey.chat._run_next_action", lambda action: actions.append(action) 519 + ) 520 + monkeypatch.setattr("convey.chat._emit_finish", lambda *args, **kwargs: None) 521 + monkeypatch.setattr("convey.chat._emit_error", lambda *args, **kwargs: None) 522 + 523 + raw_use_id = "1713625100001" 524 + with chat._state_lock: 525 + chat._current_chat_use_id = "1713625100000" 526 + chat._current_chat_state = { 527 + "raw_use_id": raw_use_id, 528 + "raw_use_ids_seen": {raw_use_id}, 529 + "trigger": {"type": "owner_message", "message": "help"}, 530 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 531 + "retry_count": 0, 532 + } 533 + 534 + chat._on_cortex_finish({"use_id": raw_use_id, "result": "not json"}) 535 + 536 + with chat._state_lock: 537 + retry_use_id = str(chat._current_chat_state["raw_use_id"]) 538 + 539 + events_before = list(read_chat_events(chat._today_day())) 540 + with caplog.at_level("DEBUG"): 541 + chat._on_cortex_finish({"use_id": raw_use_id, "result": "still not json"}) 542 + 543 + assert ( 544 + "superseded raw cortex event use_id=1713625100001 event=finish reason=raw rotated" 545 + in caplog.text 546 + ) 547 + assert "unrouteable cortex event" not in caplog.text 548 + assert read_chat_events(chat._today_day()) == events_before 549 + 550 + chat._on_cortex_finish( 551 + { 552 + "use_id": retry_use_id, 553 + "result": '{"message":"done","notes":"ok","talent_request":null}', 554 + } 555 + ) 556 + 557 + sol_messages = [ 558 + event 559 + for event in read_chat_events(chat._today_day()) 560 + if event["kind"] == "sol_message" 561 + ] 562 + assert sol_messages[-1]["text"] == "done" 563 + 564 + 565 + def test_superseded_raw_error_after_followup_rotation_is_dropped_without_warning( 566 + tmp_path, monkeypatch, caplog 567 + ): 568 + import convey.chat as chat 569 + 570 + _setup_journal(tmp_path, monkeypatch) 571 + _reset_chat_state(chat) 572 + 573 + actions: list[dict | None] = [] 574 + monkeypatch.setattr( 575 + "convey.chat._run_next_action", lambda action: actions.append(action) 576 + ) 577 + monkeypatch.setattr("convey.chat._emit_finish", lambda *args, **kwargs: None) 578 + monkeypatch.setattr("convey.chat._emit_error", lambda *args, **kwargs: None) 579 + 580 + stale_raw_use_id = "1713625200001" 581 + with chat._state_lock: 582 + chat._current_chat_use_id = "1713625200000" 583 + chat._current_chat_state = { 584 + "raw_use_id": None, 585 + "raw_use_ids_seen": {stale_raw_use_id}, 586 + "trigger": {"type": "owner_message", "message": "help"}, 587 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 588 + "retry_count": 0, 589 + } 590 + chat._active_talents["1713625200002"] = { 591 + "chat_use_id": "1713625200000", 592 + "target": "exec", 593 + "task": "summarize", 594 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 595 + } 596 + 597 + chat._on_cortex_finish({"use_id": "1713625200002", "result": "summary"}) 598 + 599 + with chat._state_lock: 600 + followup_use_id = str(chat._current_chat_state["raw_use_id"]) 601 + 602 + events_before = list(read_chat_events(chat._today_day())) 603 + with caplog.at_level("DEBUG"): 604 + chat._on_cortex_error({"use_id": stale_raw_use_id, "error": "boom"}) 605 + 606 + assert ( 607 + "superseded raw cortex event use_id=1713625200001 event=error reason=raw rotated" 608 + in caplog.text 609 + ) 610 + assert "unrouteable cortex event" not in caplog.text 611 + assert read_chat_events(chat._today_day()) == events_before 612 + assert actions[0]["trigger"]["type"] == "talent_finished" 613 + 614 + chat._on_cortex_finish( 615 + { 616 + "use_id": followup_use_id, 617 + "result": '{"message":"wrapped up","notes":"ok","talent_request":null}', 618 + } 619 + ) 620 + 621 + sol_messages = [ 622 + event 623 + for event in read_chat_events(chat._today_day()) 624 + if event["kind"] == "sol_message" 625 + ] 626 + assert sol_messages[-1]["text"] == "wrapped up" 627 + 628 + 629 + def test_unknown_raw_use_id_still_warns(tmp_path, monkeypatch, caplog): 630 + import convey.chat as chat 631 + 632 + _setup_journal(tmp_path, monkeypatch) 633 + _reset_chat_state(chat) 634 + 635 + with chat._state_lock: 636 + chat._current_chat_use_id = "1713625300000" 637 + chat._current_chat_state = { 638 + "raw_use_id": "1713625300002", 639 + "raw_use_ids_seen": {"1713625300001", "1713625300002"}, 640 + "trigger": {"type": "owner_message", "message": "help"}, 641 + "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 642 + "retry_count": 0, 643 + } 644 + 645 + with caplog.at_level("WARNING"): 646 + chat._on_cortex_finish({"use_id": "1713625300009", "result": "done"}) 647 + 648 + assert ( 649 + "unrouteable cortex event use_id=1713625300009 event=finish " 650 + "reason=no matching active chat-generate or talent" 651 + ) in caplog.text 652 + 653 + 469 654 def test_exec_dispatch_appends_sol_message_and_spawns_talent_real_path( 470 655 tmp_path, monkeypatch 471 656 ): ··· 582 767 chat._current_chat_use_id = "1713627850000" 583 768 chat._current_chat_state = { 584 769 "raw_use_id": "1713627850001", 770 + "raw_use_ids_seen": {"1713627850001"}, 585 771 "trigger": {"type": "owner_message", "message": "help"}, 586 772 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 587 773 "retry_count": 0, ··· 611 797 chat._current_chat_use_id = "1713627900000" 612 798 chat._current_chat_state = { 613 799 "raw_use_id": None, 800 + "raw_use_ids_seen": set(), 614 801 "trigger": {"type": "owner_message", "message": "help"}, 615 802 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 616 803 "retry_count": 0, ··· 760 947 chat._current_chat_use_id = "1713629000000" 761 948 chat._current_chat_state = { 762 949 "raw_use_id": None, 950 + "raw_use_ids_seen": set(), 763 951 "trigger": {"type": "owner_message", "message": "help"}, 764 952 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 765 953 "retry_count": 0, ··· 830 1018 chat._current_chat_use_id = logical_use_id 831 1019 chat._current_chat_state = { 832 1020 "raw_use_id": None, 1021 + "raw_use_ids_seen": set(), 833 1022 "trigger": {"type": "owner_message", "message": "help"}, 834 1023 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 835 1024 "retry_count": 0, ··· 980 1169 chat._current_chat_use_id = "1713626000000" 981 1170 chat._current_chat_state = { 982 1171 "raw_use_id": "1713626000001", 1172 + "raw_use_ids_seen": {"1713626000001"}, 983 1173 "trigger": {"type": "owner_message", "message": "help"}, 984 1174 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 985 1175 "retry_count": 0, ··· 1023 1213 chat._current_chat_use_id = "1713627000000" 1024 1214 chat._current_chat_state = { 1025 1215 "raw_use_id": None, 1216 + "raw_use_ids_seen": set(), 1026 1217 "trigger": {"type": "owner_message", "message": "help"}, 1027 1218 "location": {"app": "sol", "path": "/app/sol", "facet": "work"}, 1028 1219 "retry_count": 0,